Skip to content

Commit 746f096

Browse files
committed
Feedback-1 InternalLegacyOverride
1 parent 0142f15 commit 746f096

File tree

6 files changed

+383
-135
lines changed

6 files changed

+383
-135
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from aws_dbesdk_dynamodb.internaldafny.generated.AwsCryptographyDbEncryptionSdkDynamoDbItemEncryptorTypes import (
4+
DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig,
5+
Error_DynamoDbItemEncryptorException,
6+
DecryptItemInput_DecryptItemInput,
7+
EncryptItemInput_EncryptItemInput,
8+
)
9+
from aws_dbesdk_dynamodb.internaldafny.generated.AwsCryptographyDbEncryptionSdkStructuredEncryptionTypes import (
10+
CryptoAction_ENCRYPT__AND__SIGN,
11+
CryptoAction_SIGN__ONLY,
12+
CryptoAction_DO__NOTHING,
13+
)
14+
from aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb.references import (
15+
ILegacyDynamoDbEncryptor,
16+
)
17+
import smithy_dafny_standard_library.internaldafny.generated.Wrappers as Wrappers
18+
import _dafny
19+
20+
import aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride
21+
from aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.models import (
22+
EncryptItemInput,
23+
EncryptItemOutput,
24+
DecryptItemOutput,
25+
DecryptItemInput,
26+
)
27+
28+
29+
try:
30+
from dynamodb_encryption_sdk.encrypted.client import EncryptedClient
31+
from dynamodb_encryption_sdk.structures import EncryptionContext, AttributeActions
32+
from dynamodb_encryption_sdk.identifiers import CryptoAction
33+
from dynamodb_encryption_sdk.encrypted import CryptoConfig
34+
from dynamodb_encryption_sdk.internal.identifiers import ReservedAttributes
35+
36+
_HAS_LEGACY_DDBEC = True
37+
except ImportError:
38+
_HAS_LEGACY_DDBEC = False
39+
40+
41+
class InternalLegacyOverride(aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride.InternalLegacyOverride):
42+
@staticmethod
43+
def Build(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig):
44+
if config.legacyOverride.is_None:
45+
return InternalLegacyOverride.CreateBuildSuccess(InternalLegacyOverride.CreateInternalLegacyOverrideNone())
46+
47+
legacy_override = config.legacyOverride.value
48+
49+
# Precondition: The encryptor MUST be a DynamoDBEncryptor
50+
if not _HAS_LEGACY_DDBEC:
51+
return InternalLegacyOverride.CreateBuildFailure(
52+
InternalLegacyOverride.CreateError("Could not find aws-dynamodb-encryption-python installation")
53+
)
54+
if not isinstance(legacy_override.encryptor, EncryptedClient):
55+
return InternalLegacyOverride.CreateBuildFailure(
56+
InternalLegacyOverride.CreateError("Legacy encryptor is not supported")
57+
)
58+
59+
# Preconditions: MUST be able to create valid encryption context
60+
maybe_encryption_context = InternalLegacyOverride.legacyEncryptionContext(config)
61+
if maybe_encryption_context.is_Failure:
62+
return maybe_encryption_context
63+
64+
# Precondition: All actions MUST be supported types
65+
maybe_actions = InternalLegacyOverride.legacyActions(legacy_override.attributeActionsOnEncrypt)
66+
if maybe_actions.is_Failure:
67+
return maybe_actions
68+
69+
# Create and return the legacy override instance
70+
legacy_instance = InternalLegacyOverride()
71+
legacy_instance.encryptor = legacy_override.encryptor
72+
legacy_instance.policy = legacy_override.policy
73+
legacy_instance.crypto_config = CryptoConfig(
74+
materials_provider=legacy_override.encryptor._materials_provider,
75+
encryption_context=maybe_encryption_context.value,
76+
attribute_actions=maybe_actions.value,
77+
)
78+
return InternalLegacyOverride.CreateBuildSuccess(
79+
InternalLegacyOverride.CreateInternalLegacyOverrideSome(legacy_instance)
80+
)
81+
82+
def __init__(self):
83+
super().__init__()
84+
self.encryptor = None
85+
self.crypto_config = None
86+
self.policy = None
87+
88+
@staticmethod
89+
def legacyEncryptionContext(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig):
90+
"""Create the legacy encryption context from the config."""
91+
try:
92+
# Convert Dafny types to Python strings for the encryption context
93+
table_name = _dafny.string_of(config.logicalTableName)
94+
partition_key_name = _dafny.string_of(config.partitionKeyName)
95+
sort_key_name = _dafny.string_of(config.sortKeyName.value) if config.sortKeyName.is_Some else None
96+
97+
# Create the legacy encryption context with the extracted values
98+
encryption_context = EncryptionContext(
99+
table_name=table_name,
100+
partition_key_name=partition_key_name,
101+
sort_key_name=sort_key_name,
102+
)
103+
104+
return InternalLegacyOverride.CreateBuildSuccess(encryption_context)
105+
except Exception as e:
106+
# Return a failure with the error message if any exception occurs
107+
return InternalLegacyOverride.CreateBuildFailure(InternalLegacyOverride.CreateError(str(e)))
108+
109+
@staticmethod
110+
def legacyActions(attribute_actions_on_encrypt):
111+
"""Create the legacy attribute actions from the config."""
112+
try:
113+
# Create a new AttributeActions with default ENCRYPT_AND_SIGN
114+
legacy_actions = AttributeActions(default_action=CryptoAction.ENCRYPT_AND_SIGN)
115+
116+
# Map the action from the config to legacy actions
117+
attribute_actions = {}
118+
for key, action in attribute_actions_on_encrypt.items:
119+
# Convert the string key to Python string
120+
key_str = _dafny.string_of(key)
121+
122+
# Map the action type to the appropriate CryptoAction
123+
if action == CryptoAction_ENCRYPT__AND__SIGN():
124+
attribute_actions[key_str] = CryptoAction.ENCRYPT_AND_SIGN
125+
elif action == CryptoAction_SIGN__ONLY():
126+
attribute_actions[key_str] = CryptoAction.SIGN_ONLY
127+
elif action == CryptoAction_DO__NOTHING():
128+
attribute_actions[key_str] = CryptoAction.DO_NOTHING
129+
else:
130+
return InternalLegacyOverride.CreateBuildFailure(
131+
InternalLegacyOverride.CreateError(f"Unknown action type: {action}")
132+
)
133+
134+
# Update the attribute_actions dictionary
135+
legacy_actions.attribute_actions = attribute_actions
136+
return InternalLegacyOverride.CreateBuildSuccess(legacy_actions)
137+
except Exception as e:
138+
return InternalLegacyOverride.CreateBuildFailure(InternalLegacyOverride.CreateError(str(e)))
139+
140+
def EncryptItem(self, input: EncryptItemInput_EncryptItemInput):
141+
"""Encrypt an item using the legacy DynamoDB encryptor.
142+
143+
Args:
144+
input: EncryptItemInput containing the plaintext item to encrypt
145+
146+
Returns:
147+
Result containing the encrypted item or an error
148+
"""
149+
try:
150+
# Check policy
151+
if not self.policy.is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT:
152+
return Wrappers.Result_Failure(
153+
InternalLegacyOverride.CreateError("Legacy policy does not support encrypt")
154+
)
155+
156+
# Get the Native Plaintext Item
157+
native_input = aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.dafny_to_smithy.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_EncryptItemInput(
158+
input
159+
)
160+
161+
# Use the encryptor to encrypt the item using the instance attributes
162+
encrypted_item = self.encryptor._encrypt_item(
163+
item=native_input.plaintext_item,
164+
crypto_config=self.crypto_config.with_item(native_input.plaintext_item),
165+
)
166+
167+
# Return the encrypted item
168+
# The legacy encryption client returns items in the format that Dafny expects,
169+
# so no additional conversion is needed here
170+
native_output = EncryptItemOutput(encrypted_item=encrypted_item, parsed_header=None)
171+
dafny_output = aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.smithy_to_dafny.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_EncryptItemOutput(
172+
native_output
173+
)
174+
return Wrappers.Result_Success(dafny_output)
175+
176+
except Exception as e:
177+
# Return an appropriate error result with the exception details
178+
return Wrappers.Result_Failure(InternalLegacyOverride.CreateError(f"Error during encryption: {str(e)}"))
179+
180+
def DecryptItem(self, input: DecryptItemInput_DecryptItemInput):
181+
"""Decrypt an item using the legacy DynamoDB encryptor.
182+
183+
Args:
184+
input: DecryptItemInput containing the encrypted item to decrypt
185+
186+
Returns:
187+
Result containing the decrypted item or an error
188+
"""
189+
try:
190+
# Check policy
191+
if not (
192+
self.policy.is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT
193+
or self.policy.is_FORBID__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT
194+
):
195+
return Wrappers.Result_Failure(
196+
InternalLegacyOverride.CreateError("Legacy policy does not support decrypt")
197+
)
198+
199+
# Get the Native DecryptItemInput
200+
native_input: DecryptItemInput = (
201+
aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.dafny_to_smithy.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_DecryptItemInput(
202+
input
203+
)
204+
)
205+
# Use the encryptor to decrypt the item using the instance attributes
206+
decrypted_item = self.encryptor._decrypt_item(
207+
item=native_input.encrypted_item,
208+
crypto_config=self.crypto_config.with_item(native_input.encrypted_item),
209+
)
210+
211+
native_output = DecryptItemOutput(plaintext_item=decrypted_item, parsed_header=None)
212+
dafny_output = aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.smithy_to_dafny.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_DecryptItemOutput(
213+
native_output
214+
)
215+
return Wrappers.Result_Success(dafny_output)
216+
except Exception as e:
217+
# Return an appropriate error result with the exception details
218+
return Wrappers.Result_Failure(InternalLegacyOverride.CreateError(f"Error during decryption: {str(e)}"))
219+
220+
def IsLegacyInput(self, input: DecryptItemInput_DecryptItemInput):
221+
"""
222+
Determine if the input is from a legacy client.
223+
224+
Args:
225+
input: The decrypt item input to check
226+
227+
Returns:
228+
Boolean indicating if the input is from a legacy client
229+
"""
230+
try:
231+
if not _HAS_LEGACY_DDBEC:
232+
return False
233+
234+
if not input.is_DecryptItemInput:
235+
return False
236+
237+
# Get the Native DecryptItemInput
238+
native_input: DecryptItemInput = (
239+
aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.dafny_to_smithy.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_DecryptItemInput(
240+
input
241+
)
242+
)
243+
# = specification/dynamodb-encryption-client/decrypt-item.md#determining-legacy-items
244+
## An item MUST be determined to be encrypted under the legacy format if it contains
245+
## attributes for the material description and the signature.
246+
return (
247+
"*amzn-ddb-map-desc*" in native_input.encrypted_item
248+
and "*amzn-ddb-map-sig*" in native_input.encrypted_item
249+
)
250+
251+
except Exception as e:
252+
# If we encounter any error during detection, default to not using legacy
253+
return Wrappers.Result_Failure(InternalLegacyOverride.CreateError(f"Error in IsLegacyInput: {e}"))
254+
255+
@staticmethod
256+
def CreateError(message):
257+
"""Create an Error with the given message."""
258+
return Error_DynamoDbItemEncryptorException(message)
259+
260+
261+
aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride.InternalLegacyOverride = InternalLegacyOverride

0 commit comments

Comments
 (0)