From 0b97693f098e210141d8b58ba224c1ac6355206d Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Tue, 25 Jul 2023 14:39:50 +0200 Subject: [PATCH 1/6] fix(parameters): make cache aware of single vs multiple calls Signed-off-by: heitorlessa --- aws_lambda_powertools/utilities/parameters/base.py | 2 +- aws_lambda_powertools/utilities/parameters/types.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/parameters/base.py b/aws_lambda_powertools/utilities/parameters/base.py index e4be9d33cdc..78bf865faf0 100644 --- a/aws_lambda_powertools/utilities/parameters/base.py +++ b/aws_lambda_powertools/utilities/parameters/base.py @@ -27,7 +27,7 @@ from aws_lambda_powertools.shared import constants, user_agent from aws_lambda_powertools.shared.functions import resolve_max_age -from aws_lambda_powertools.utilities.parameters.types import TransformOptions +from aws_lambda_powertools.utilities.parameters.types import RecursiveOptions, TransformOptions from .exceptions import GetParameterError, TransformParameterError diff --git a/aws_lambda_powertools/utilities/parameters/types.py b/aws_lambda_powertools/utilities/parameters/types.py index 6a15873c496..2dbf1593d72 100644 --- a/aws_lambda_powertools/utilities/parameters/types.py +++ b/aws_lambda_powertools/utilities/parameters/types.py @@ -1,3 +1,4 @@ from typing_extensions import Literal TransformOptions = Literal["json", "binary", "auto", None] +RecursiveOptions = Literal[True, False] From 7991884d61653dc5837c4051c9af7f6778c5fc26 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Tue, 25 Jul 2023 15:16:51 +0200 Subject: [PATCH 2/6] chore: cleanup, add test for single and nested Signed-off-by: heitorlessa --- aws_lambda_powertools/utilities/parameters/base.py | 2 +- aws_lambda_powertools/utilities/parameters/types.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/parameters/base.py b/aws_lambda_powertools/utilities/parameters/base.py index 78bf865faf0..e4be9d33cdc 100644 --- a/aws_lambda_powertools/utilities/parameters/base.py +++ b/aws_lambda_powertools/utilities/parameters/base.py @@ -27,7 +27,7 @@ from aws_lambda_powertools.shared import constants, user_agent from aws_lambda_powertools.shared.functions import resolve_max_age -from aws_lambda_powertools.utilities.parameters.types import RecursiveOptions, TransformOptions +from aws_lambda_powertools.utilities.parameters.types import TransformOptions from .exceptions import GetParameterError, TransformParameterError diff --git a/aws_lambda_powertools/utilities/parameters/types.py b/aws_lambda_powertools/utilities/parameters/types.py index 2dbf1593d72..6a15873c496 100644 --- a/aws_lambda_powertools/utilities/parameters/types.py +++ b/aws_lambda_powertools/utilities/parameters/types.py @@ -1,4 +1,3 @@ from typing_extensions import Literal TransformOptions = Literal["json", "binary", "auto", None] -RecursiveOptions = Literal[True, False] From 5e54c25868c279635ca1ff7ec1104a04f7a44b7a Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Wed, 2 Aug 2023 10:42:52 +0200 Subject: [PATCH 3/6] refactor(e2e-utils): raise upon invocation error --- tests/e2e/utils/data_fetcher/common.py | 35 ++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/e2e/utils/data_fetcher/common.py b/tests/e2e/utils/data_fetcher/common.py index 3bbdaa24e6a..bab4f8c75fb 100644 --- a/tests/e2e/utils/data_fetcher/common.py +++ b/tests/e2e/utils/data_fetcher/common.py @@ -14,10 +14,45 @@ def get_lambda_response( lambda_arn: str, payload: Optional[str] = None, client: Optional[LambdaClient] = None, + raise_on_error: bool = True, ) -> Tuple[InvocationResponseTypeDef, datetime]: + """Invoke function synchronously + + Parameters + ---------- + lambda_arn : str + Lambda function ARN to invoke + payload : Optional[str], optional + JSON payload for Lambda invocation, by default None + client : Optional[LambdaClient], optional + Boto3 Lambda SDK client, by default None + raise_on_error : bool, optional + Whether to raise exception upon invocation error, by default True + + Returns + ------- + Tuple[InvocationResponseTypeDef, datetime] + Function response and approximate execution time + + Raises + ------ + RuntimeError + Function invocation error details + """ client = client or boto3.client("lambda") payload = payload or "" execution_time = datetime.utcnow() + + response: InvocationResponseTypeDef = client.invoke( + FunctionName=lambda_arn, + InvocationType="RequestResponse", + Payload=payload, + ) + has_error = response.get("FunctionError", "") == "Unhandled" + if has_error and raise_on_error: + error_payload = response["Payload"].read().decode() + raise RuntimeError(f"Function failed invocation: {error_payload}") + return client.invoke(FunctionName=lambda_arn, InvocationType="RequestResponse", Payload=payload), execution_time From 204342dcd52def19d5323726a7b0c58a0312b1a5 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Wed, 2 Aug 2023 13:39:38 +0200 Subject: [PATCH 4/6] refactor(e2e-utils): make parallel executions maintainable --- .../idempotency/test_idempotency_dynamodb.py | 26 +++++--- tests/e2e/utils/data_builder/traces.py | 2 +- tests/e2e/utils/data_fetcher/common.py | 61 +++++++++++++++++-- tests/e2e/utils/functions.py | 32 ---------- 4 files changed, 76 insertions(+), 45 deletions(-) delete mode 100644 tests/e2e/utils/functions.py diff --git a/tests/e2e/idempotency/test_idempotency_dynamodb.py b/tests/e2e/idempotency/test_idempotency_dynamodb.py index a374f763bd4..1d61cb69f9f 100644 --- a/tests/e2e/idempotency/test_idempotency_dynamodb.py +++ b/tests/e2e/idempotency/test_idempotency_dynamodb.py @@ -4,7 +4,7 @@ import pytest from tests.e2e.utils import data_fetcher -from tests.e2e.utils.functions import execute_lambdas_in_parallel +from tests.e2e.utils.data_fetcher.common import GetLambdaResponseOptions, get_lambda_response_in_parallel @pytest.fixture @@ -73,14 +73,21 @@ def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn: @pytest.mark.xdist_group(name="idempotency") def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str): # GIVEN - payload_timeout_execution = json.dumps({"sleep": 5, "message": "Powertools for AWS Lambda (Python) - TTL 1s"}) - payload_working_execution = json.dumps({"sleep": 0, "message": "Powertools for AWS Lambda (Python) - TTL 1s"}) + payload_timeout_execution = json.dumps( + {"sleep": 5, "message": "Powertools for AWS Lambda (Python) - TTL 1s"}, + sort_keys=True, + ) + payload_working_execution = json.dumps( + {"sleep": 0, "message": "Powertools for AWS Lambda (Python) - TTL 1s"}, + sort_keys=True, + ) # WHEN # first call should fail due to timeout execution_with_timeout, _ = data_fetcher.get_lambda_response( lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_timeout_execution, + raise_on_error=False, ) execution_with_timeout_response = execution_with_timeout["Payload"].read().decode("utf-8") @@ -99,12 +106,15 @@ def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str): @pytest.mark.xdist_group(name="idempotency") def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str): # GIVEN - arguments = json.dumps({"message": "Powertools for AWS Lambda (Python) - Parallel execution"}) + payload = json.dumps({"message": "Powertools for AWS Lambda (Python) - Parallel execution"}) - # WHEN - # executing Lambdas in parallel - lambdas_arn = [parallel_execution_handler_fn_arn, parallel_execution_handler_fn_arn] - execution_result_list = execute_lambdas_in_parallel("data_fetcher.get_lambda_response", lambdas_arn, arguments) + invocation_options = [ + GetLambdaResponseOptions(lambda_arn=parallel_execution_handler_fn_arn, payload=payload, raise_on_error=False), + GetLambdaResponseOptions(lambda_arn=parallel_execution_handler_fn_arn, payload=payload, raise_on_error=False), + ] + + # WHEN executing Lambdas in parallel + execution_result_list = get_lambda_response_in_parallel(invocation_options) timeout_execution_response = execution_result_list[0][0]["Payload"].read().decode("utf-8") error_idempotency_execution_response = execution_result_list[1][0]["Payload"].read().decode("utf-8") diff --git a/tests/e2e/utils/data_builder/traces.py b/tests/e2e/utils/data_builder/traces.py index 59350c8ff68..e6356582a30 100644 --- a/tests/e2e/utils/data_builder/traces.py +++ b/tests/e2e/utils/data_builder/traces.py @@ -2,7 +2,7 @@ def build_trace_default_query(function_name: str) -> str: - return f'service("{function_name}")' + return f'service(id(name: "{function_name}"))' def build_put_annotations_input(**annotations: str) -> List[Dict]: diff --git a/tests/e2e/utils/data_fetcher/common.py b/tests/e2e/utils/data_fetcher/common.py index bab4f8c75fb..9c251cd6ed2 100644 --- a/tests/e2e/utils/data_fetcher/common.py +++ b/tests/e2e/utils/data_fetcher/common.py @@ -1,21 +1,38 @@ +import functools +import time +from concurrent.futures import Future, ThreadPoolExecutor from datetime import datetime -from typing import Optional, Tuple +from typing import List, Optional, Tuple import boto3 import requests from mypy_boto3_lambda import LambdaClient from mypy_boto3_lambda.type_defs import InvocationResponseTypeDef +from pydantic import BaseModel from requests import Request, Response from requests.exceptions import RequestException from retry import retry +GetLambdaResponse = Tuple[InvocationResponseTypeDef, datetime] + + +class GetLambdaResponseOptions(BaseModel): + lambda_arn: str + payload: Optional[str] = None + client: Optional[LambdaClient] = None + raise_on_error: bool = True + + # Maintenance: Pydantic v2 deprecated it; we should update in v3 + class Config: + arbitrary_types_allowed = True + def get_lambda_response( lambda_arn: str, payload: Optional[str] = None, client: Optional[LambdaClient] = None, raise_on_error: bool = True, -) -> Tuple[InvocationResponseTypeDef, datetime]: +) -> GetLambdaResponse: """Invoke function synchronously Parameters @@ -42,18 +59,18 @@ def get_lambda_response( client = client or boto3.client("lambda") payload = payload or "" execution_time = datetime.utcnow() - response: InvocationResponseTypeDef = client.invoke( FunctionName=lambda_arn, InvocationType="RequestResponse", Payload=payload, ) + has_error = response.get("FunctionError", "") == "Unhandled" if has_error and raise_on_error: error_payload = response["Payload"].read().decode() raise RuntimeError(f"Function failed invocation: {error_payload}") - return client.invoke(FunctionName=lambda_arn, InvocationType="RequestResponse", Payload=payload), execution_time + return response, execution_time @retry(RequestException, delay=2, jitter=1.5, tries=5) @@ -62,3 +79,39 @@ def get_http_response(request: Request) -> Response: result = session.send(request.prepare()) result.raise_for_status() return result + + +def get_lambda_response_in_parallel( + get_lambda_response_options: List[GetLambdaResponseOptions], +) -> List[GetLambdaResponse]: + """Invoke functions in parallel + + Parameters + ---------- + get_lambda_response_options : List[GetLambdaResponseOptions] + List of options to call get_lambda_response with + + Returns + ------- + List[GetLambdaResponse] + Function responses and approximate execution time + """ + result_list = [] + with ThreadPoolExecutor() as executor: + running_tasks: List[Future] = [] + for options in get_lambda_response_options: + # Sleep 0.5, 1, 1.5, ... seconds between each invocation. This way + # we can guarantee that lambdas are executed in parallel, but they are + # called in the same "order" as they are passed in, thus guaranteeing that + # we can assert on the correct output. + time.sleep(0.5 * len(running_tasks)) + + get_lambda_response_callback = functools.partial(get_lambda_response, **options.dict()) + running_tasks.append( + executor.submit(get_lambda_response_callback), + ) + + executor.shutdown(wait=True) + result_list.extend(running_task.result() for running_task in running_tasks) + + return result_list diff --git a/tests/e2e/utils/functions.py b/tests/e2e/utils/functions.py deleted file mode 100644 index 64135c96aa3..00000000000 --- a/tests/e2e/utils/functions.py +++ /dev/null @@ -1,32 +0,0 @@ -import time -from concurrent.futures import Future, ThreadPoolExecutor -from typing import List - -from tests.e2e.utils import data_fetcher # noqa F401 - - -def execute_lambdas_in_parallel(function_name: str, lambdas_arn: list, arguments: str): - result_list = [] - with ThreadPoolExecutor() as executor: - running_tasks: List[Future] = [] - for arn in lambdas_arn: - # Sleep 0.5, 1, 1.5, ... seconds between each invocation. This way - # we can guarantee that lambdas are executed in parallel, but they are - # called in the same "order" as they are passed in, thus guaranteeing that - # we can assert on the correct output. - time.sleep(0.5 * len(running_tasks)) - running_tasks.append( - executor.submit( - lambda lname, larn, largs: eval(lname)(larn, largs), - function_name, - arn, - arguments, - ), - ) - - executor.shutdown(wait=True) - - for running_task in running_tasks: - result_list.append(running_task.result()) - - return result_list From 5f684cf164e0f49a5d11a26f08b6074f4906800d Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Tue, 25 Jul 2023 14:39:50 +0200 Subject: [PATCH 5/6] fix(parameters): make cache aware of single vs multiple calls Signed-off-by: heitorlessa --- aws_lambda_powertools/utilities/parameters/base.py | 2 +- aws_lambda_powertools/utilities/parameters/types.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/parameters/base.py b/aws_lambda_powertools/utilities/parameters/base.py index e4be9d33cdc..78bf865faf0 100644 --- a/aws_lambda_powertools/utilities/parameters/base.py +++ b/aws_lambda_powertools/utilities/parameters/base.py @@ -27,7 +27,7 @@ from aws_lambda_powertools.shared import constants, user_agent from aws_lambda_powertools.shared.functions import resolve_max_age -from aws_lambda_powertools.utilities.parameters.types import TransformOptions +from aws_lambda_powertools.utilities.parameters.types import RecursiveOptions, TransformOptions from .exceptions import GetParameterError, TransformParameterError diff --git a/aws_lambda_powertools/utilities/parameters/types.py b/aws_lambda_powertools/utilities/parameters/types.py index 6a15873c496..2dbf1593d72 100644 --- a/aws_lambda_powertools/utilities/parameters/types.py +++ b/aws_lambda_powertools/utilities/parameters/types.py @@ -1,3 +1,4 @@ from typing_extensions import Literal TransformOptions = Literal["json", "binary", "auto", None] +RecursiveOptions = Literal[True, False] From de16a0f220d2129b2b4cc2c9d0a2cf3d1e93dfce Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Tue, 25 Jul 2023 15:16:51 +0200 Subject: [PATCH 6/6] chore: cleanup, add test for single and nested Signed-off-by: heitorlessa --- aws_lambda_powertools/utilities/parameters/base.py | 2 +- aws_lambda_powertools/utilities/parameters/types.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/parameters/base.py b/aws_lambda_powertools/utilities/parameters/base.py index 78bf865faf0..e4be9d33cdc 100644 --- a/aws_lambda_powertools/utilities/parameters/base.py +++ b/aws_lambda_powertools/utilities/parameters/base.py @@ -27,7 +27,7 @@ from aws_lambda_powertools.shared import constants, user_agent from aws_lambda_powertools.shared.functions import resolve_max_age -from aws_lambda_powertools.utilities.parameters.types import RecursiveOptions, TransformOptions +from aws_lambda_powertools.utilities.parameters.types import TransformOptions from .exceptions import GetParameterError, TransformParameterError diff --git a/aws_lambda_powertools/utilities/parameters/types.py b/aws_lambda_powertools/utilities/parameters/types.py index 2dbf1593d72..6a15873c496 100644 --- a/aws_lambda_powertools/utilities/parameters/types.py +++ b/aws_lambda_powertools/utilities/parameters/types.py @@ -1,4 +1,3 @@ from typing_extensions import Literal TransformOptions = Literal["json", "binary", "auto", None] -RecursiveOptions = Literal[True, False]