Skip to content

A candidate for wr.dynamodb.read_items #1867

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
9 of 12 tasks
a-slice-of-py opened this issue Dec 15, 2022 · 4 comments
Closed
9 of 12 tasks

A candidate for wr.dynamodb.read_items #1867

a-slice-of-py opened this issue Dec 15, 2022 · 4 comments
Labels

Comments

@a-slice-of-py
Copy link
Contributor

DISCLAIMER: I'm opening a new issue even if #895 has a similar scope: I commented on that even if it was already closed, but I wasn't sure that the proposal has got enough visibility, so here I am.

I recently found myself putting some effort in trying to handle reading items from a DynamoDB table and returning a Pandas Dataframe. Basically, I wanted to abstract some complexity away from available Boto3 read actions, and handle once for all the headache of thinking about keys, query, scan, etc.: since I'm pretty happy with the results, I decided to share it here to let you evaluate if the proposed solution can be a good candidate for wr.dynamodb.read_items.

I am aware of the recent addition of wr.dynamodb.read_partiql_query in #1390, as well as its currently issue as reported in #1571, but the below proposed solution does not involve PartiQL: my goal was to avoid as much as possible the risks that come with its usage towards a DynamoDB table, regarding possible translation of a given query to a full scan op (see for example the disclaimer in the docs).

Features

I tested all the below-checked features with relatively simple dummy tables, so it probably requires more focused test sessions before possible addition in awswrangler stable codebase.

  • automatically switch between available DynamoDB read actions, choosing the optimal one as defined in this hierarchy get_item > batch_get_item > query > scan (inspiration from here and here)
  • handle response pagination ("manually", without using paginators)
  • handle possible UnprocessedKeys in batch_get_item response
  • support condition expressions both as string and Key/Attr from boto3.dynamodb.conditions
  • automatically handles botocore client error involving DynamoDB reserved keywords via ExpressionAttributeNames substitutions
  • prevent unwanted full table scan thanks to the allow_full_scan kwarg which defaults to False
  • allow attributes selection via columns kwarg, which corresponds to Boto3 ProjectionExpression
  • support both strongly and eventually consistent reads with the consistent kwarg, which defaults to False
  • support limiting the number of returning items with the max_items_evaluated kwarg (a kind of an head() method for the table!)
  • returning consumed read capacity units
  • anything related to local or global secondary table index
  • parallel scan

The last features are unchecked because I considered them out of scope, at least for the moment.

Source code

The below snippet should be auto-consistent, but if its testing become uncomfortable I should be able to temporarly packaging it and publish to pip (I tried to stick to awswrangler best practices as much as I can).

from functools import wraps
from typing import Any, Callable, Mapping, Optional, Sequence, Union, List

import awswrangler as wr
import boto3
import pandas as pd
from awswrangler import _utils, exceptions
from boto3.dynamodb.conditions import ConditionBase
from botocore.exceptions import ClientError


def get_invalid_kwarg(msg: str) -> Optional[str]:
    """Detect which kwarg contains reserved keywords based on given error message.

    Parameters
    ----------
    msg : str
        Botocore client error message.

    Returns
    -------
    str, optional
        Detected invalid kwarg if any, None otherwise.
    """
    for kwarg in ("ProjectionExpression", "KeyConditionExpression", "FilterExpression"):
        if msg.startswith(
            f"Invalid {kwarg}: Attribute name is a reserved keyword; reserved keyword: "
        ):
            return kwarg


def handle_reserved_keyword_error(func: Callable) -> Any:
    """Handle automatic replacement of DynamoDB reserved keywords.

    For reserved keywords reference: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html.
    """

    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except ClientError as e:
            error_code, error_message = (
                e.response["Error"]["Code"],
                e.response["Error"]["Message"],
            )
            # Check catched error to verify its message
            if (error_code == "ValidationException") and (
                kwarg := get_invalid_kwarg(error_message)
            ):
                reserved_keyword = error_message.split("keyword: ")[-1]
                sanitized_keyword = f"#{reserved_keyword}"
                kwargs[kwarg] = kwargs[kwarg].replace(
                    reserved_keyword, sanitized_keyword
                )
                kwargs["ExpressionAttributeNames"] = {
                    **kwargs.get("ExpressionAttributeNames", {}),
                    sanitized_keyword: reserved_keyword,
                }
                # SEE: recursive approach guarantees that each reserved keyword will be properly replaced,
                # even if it will require as many calls as the reserved keywords involved (not so efficient...)
                return wrapper(*args, **kwargs)
            # Otherwise raise it
            else:
                raise e

    return wrapper


@handle_reserved_keyword_error
def _read_items(
    table_name: str, boto3_session: Optional[boto3.Session] = None, **kwargs
) -> Sequence:
    """Read items from given DynamoDB table.

    This function set the optimal reading strategy based on the received kwargs.

    Parameters
    ----------
    table_name : str
        DynamoDB table name.
    boto3_session : boto3.Session, optional
        Boto3 Session. Defaults to None (the default boto3 Session will be used).

    Returns
    -------
    Sequence
        Retrieved items.
    """
    # Get DynamoDB resource and Table instance
    resource = _utils.resource(service_name="dynamodb", session=boto3_session)
    table = wr.dynamodb.get_table(table_name=table_name, boto3_session=boto3_session)

    # Extract 'Keys' from provided kwargs: if needed, will be reinserted later on
    keys = kwargs.pop("Keys", None)

    # Conditionally define optimal reading strategy
    use_get_item = (keys is not None) and (len(keys) == 1)
    use_batch_get_item = (keys is not None) and (len(keys) > 1)
    use_query = (keys is None) and ("KeyConditionExpression" in kwargs)
    use_scan = (keys is None) and ("KeyConditionExpression" not in kwargs)

    # Read items
    if use_get_item:
        kwargs["Key"] = keys[0]
        items = [table.get_item(**kwargs).get("Item", {})]
    elif use_batch_get_item:
        kwargs["Keys"] = keys
        response = resource.batch_get_item(RequestItems={table_name: kwargs})
        items = response.get("Responses", {table_name: []}).get(table_name, [])
        # SEE: handle possible unprocessed keys. As suggested in Boto3 docs,
        # this approach should involve exponential backoff, but this should be
        # already managed by AWS SDK itself, as stated
        # [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html)
        while response["UnprocessedKeys"]:
            kwargs["Keys"] = response["UnprocessedKeys"][table_name]["Keys"]
            response = resource.batch_get_item(RequestItems={table_name: kwargs})
            items.extend(
                response.get("Responses", {table_name: []}).get(table_name, [])
            )
    elif use_query or use_scan:
        _read_method = table.query if use_query else table.scan
        response = _read_method(**kwargs)
        items = response.get("Items", [])

        # Handle pagination
        while "LastEvaluatedKey" in response:
            kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]
            response = _read_method(**kwargs)
            items.extend(response.get("Items", []))

    return items


def read_items(
    *,
    table_name: str,
    partition_values: Optional[Sequence[Any]] = None,
    sort_values: Optional[Sequence[Any]] = None,
    filter_expression: Optional[Union[ConditionBase, str]] = None,
    key_condition_expression: Optional[Union[ConditionBase, str]] = None,
    expression_attribute_names: Optional[Mapping] = None,
    expression_attribute_values: Optional[Mapping] = None,
    consistent: bool = False,
    columns: Optional[Sequence] = None,
    allow_full_scan: bool = False,
    max_items_evaluated: Optional[int] = None,
    as_dataframe: bool = True,
    boto3_session: Optional[boto3.Session] = None,
) -> Union[pd.DataFrame, List, Mapping]:
    """Read items from given DynamoDB table.

    This function aims to gracefully handle (some of) the complexity of read actions
    available in Boto3 towards a DynamoDB table, abstracting it away while providing
    a single, unified entrypoint.

    Under the hood, it wraps all the four available read actions: get_item, batch_get_item,
    query and scan.

    Parameters
    ----------
    table_name : str
        DynamoDB table name.
    partition_values : Sequence[Any], optional
        Partition key values to retrieve. Defaults to None.
    sort_values : Sequence[Any], optional
        Sort key values to retrieve. Defaults to None.
    filter_expression : Union[ConditionBase, str], optional
        Filter expression as string or combinations of boto3.dynamodb.conditions.Attr conditions. Defaults to None.
    key_condition_expression : Union[ConditionBase, str], optional
        Key condition expression as string or combinations of boto3.dynamodb.conditions.Key conditions. Defaults to None.
    expression_attribute_names : Mapping, optional
        Mapping of placeholder and target attributes. Defaults to None.
    expression_attribute_values : Mapping, optional
        Mapping of placeholder and target values. Defaults to None.
    consistent : bool
        If True, ensure that the performed read operation is strongly consistent, otherwise eventually consistent. Defaults to False.
    columns : Sequence, optional
        Attributes to retain in the returned items. Defaults to None (all attributes).
    allow_full_scan : bool
        If True, allow full table scan without any filtering. Defaults to False.
    max_items_evaluated : int, optional
        Limit the number of items evaluated in case of query or scan operations. Defaults to None (all matching items).
    as_dataframe : bool
        If True, return items as pd.DataFrame, otherwise as list/dict. Defaults to True.
    boto3_session : boto3.Session, optional
        Boto3 Session. Defaults to None (the default boto3 Session will be used).

    Raises
    ------
    exceptions.InvalidArgumentType
        When the specified table has also a sort key but only the partition values are specified.
    exceptions.InvalidArgumentCombination
        When both partition and sort values sequences are specified but they have different lengths, or when provided parameters are not enough informative to proceed with a read operation.

    Returns
    -------
    Union[pd.DataFrame, List, Mapping]
        A Dataframe containing the retrieved items, or sequence of raw items or directly the only item retrieved.

    Examples
    --------
    Reading 5 random items from a table

    >>> import awswrangler as wr
    >>> df = wr.dynamodb.read_items(table_name='my-table', max_items_evaluated=5)

    Strongly-consistent reading of a given partition value from a table

    >>> import awswrangler as wr
    >>> df = wr.dynamodb.read_items(table_name='my-table', partition_values=['my-value'], consistent=True)

    Reading items pairwise-identified by partition and sort values, from a table with a composite primary key

    >>> import awswrangler as wr
    >>> df = wr.dynamodb.read_items(
    ...     table_name='my-table',
    ...     partition_values=['pv_1', 'pv_2'],
    ...     sort_values=['sv_1', 'sv_2']
    ... )

    Reading items while retaining only specified attributes, automatically handling possible collision with DynamoDB reserved keywords

    >>> import awswrangler as wr
    >>> df = wr.dynamodb.read_items(
    ...     table_name='my-table',
    ...     partition_values=['my-value'],
    ...     columns=['connection', 'other_col'] # connection is a reserved keyword, managed under the hood!
    ... )

    Reading all items from a table explicitly allowing full scan

    >>> import awswrangler as wr
    >>> df = wr.dynamodb.read_items(table_name='my-table', allow_full_scan=True)

    Reading items matching a KeyConditionExpression expressed with boto3.dynamodb.conditions.Key

    >>> import awswrangler as wr
    >>> from boto3.dynamodb.conditions import Key
    >>> df = wr.dynamodb.read_items(
    ...     table_name='my-table',
    ...     key_condition_expression=Key('key_1').eq('val_1') and Key('key_2').eq('val_2')
    ... )

    Same as above, but with KeyConditionExpression as string

    >>> import awswrangler as wr
    >>> df = wr.dynamodb.read_items(
    ...     table_name='my-table',
    ...     key_condition_expression='key_1 = :v1 and key_2 = :v2',
    ...     expression_attribute_values={':v1': 'val_1', ':v2': 'val_2'},
    ... )

    Reading items matching a FilterExpression expressed with boto3.dynamodb.conditions.Attr

    >>> import awswrangler as wr
    >>> from boto3.dynamodb.conditions import Attr
    >>> df = wr.dynamodb.read_items(
    ...     table_name='my-table',
    ...     filter_expression=Attr('my_attr').eq('this-value')
    ... )

    Same as above, but with FilterExpression as string

    >>> import awswrangler as wr
    >>> df = wr.dynamodb.read_items(
    ...     table_name='my-table',
    ...     filter_expression='my_attr = :v',
    ...     expression_attribute_values={':v': 'this-value'}
    ... )

    Reading items involving an attribute which collides with DynamoDB reserved keywords

    >>> import awswrangler as wr
    >>> df = wr.dynamodb.read_items(
    ...     table_name='my-table',
    ...     filter_expression='#operator = :v',
    ...     expression_attribute_names={'#operator': 'operator'},
    ...     expression_attribute_values={':v': 'this-value'}
    ... )

    """
    # Extract key schema
    table_key_schema = wr.dynamodb.get_table(
        table_name=table_name, boto3_session=boto3_session
    ).key_schema

    # Detect sort key, if any
    if len(table_key_schema) == 1:
        partition_key, sort_key = table_key_schema[0]["AttributeName"], None
    else:
        partition_key, sort_key = (
            next(filter(lambda x: x["KeyType"] == "HASH", table_key_schema))[
                "AttributeName"
            ],
            next(filter(lambda x: x["KeyType"] == "RANGE", table_key_schema))[
                "AttributeName"
            ],
        )

    # Handy checker
    def ensure_coherency():
        if not sort_values:
            raise exceptions.InvalidArgumentType(
                f"Kwarg sort_values must be specified: table {table_name} has {sort_key} as sort key."
            )
        elif len(sort_values) != len(partition_values):
            raise exceptions.InvalidArgumentCombination(
                "Partition and sort values must have the same length."
            )

    # Build kwargs shared by read methods
    kwargs = {"ConsistentRead": consistent}
    if partition_values:
        if sort_key is None:
            keys = [{partition_key: pv} for pv in partition_values]
        else:
            ensure_coherency()
            keys = [
                {partition_key: pv, sort_key: sv}
                for pv, sv in zip(partition_values, sort_values)
            ]
        kwargs["Keys"] = keys
    if key_condition_expression:
        kwargs["KeyConditionExpression"] = key_condition_expression
    if filter_expression:
        kwargs["FilterExpression"] = filter_expression
    if columns:
        kwargs["ProjectionExpression"] = ", ".join(columns)
    if expression_attribute_names:
        kwargs["ExpressionAttributeNames"] = expression_attribute_names
    if expression_attribute_values:
        kwargs["ExpressionAttributeValues"] = expression_attribute_values
    if max_items_evaluated:
        kwargs["Limit"] = max_items_evaluated

    # If kwargs are sufficiently informative, proceed with actual read op
    if (
        partition_values
        or key_condition_expression
        or filter_expression
        or allow_full_scan
        or max_items_evaluated
    ):
        items = _read_items(table_name, boto3_session, **kwargs)
    # Raise otherwise
    else:
        raise exceptions.InvalidArgumentCombination(
            f"Please provide at least one between partition_values, key_condition_expression, filter_expression, allow_full_scan or max_items_evaluated."
        )

    # Enforce DataFrame type if requested
    if as_dataframe:
        return pd.DataFrame(items)
    else:
        return items

Examples

Reading 5 random items from a table

import awswrangler as wr
df = wr.dynamodb.read_items(table_name='my-table', max_items_evaluated=5)

Strongly-consistent reading of a given partition value from a table

import awswrangler as wr
df = wr.dynamodb.read_items(table_name='my-table', partition_values=['my-value'], consistent=True)

Reading items pairwise-identified by partition and sort values, from a table with a composite primary key

import awswrangler as wr
df = wr.dynamodb.read_items(
    table_name='my-table',
    partition_values=['pv_1', 'pv_2'],
    sort_values=['sv_1', 'sv_2']
)

Reading items while retaining only specified attributes, automatically handling possible collision with DynamoDB reserved keywords

import awswrangler as wr
df = wr.dynamodb.read_items(
    table_name='my-table', 
    partition_values=['my-value'], 
    columns=['connection', 'other_col'] # connection is a reserved keyword, managed under the hood!
)

Reading all items from a table explicitly allowing full scan

import awswrangler as wr
df = wr.dynamodb.read_items(table_name='my-table', allow_full_scan=True)

Reading items matching a KeyConditionExpression expressed with boto3.dynamodb.conditions.Key

import awswrangler as wr
from boto3.dynamodb.conditions import Key
df = wr.dynamodb.read_items(
    table_name='my-table',
    key_condition_expression=Key('key_1').eq('val_1') and Key('key_2').eq('val_2')
)

Same as above, but with KeyConditionExpression as string

import awswrangler as wr
df = wr.dynamodb.read_items(
    table_name='my-table',
    key_condition_expression='key_1 = :v1 and key_2 = :v2',
    expression_attribute_values={':v1': 'val_1', ':v2': 'val_2'},
)

Reading items matching a FilterExpression expressed with boto3.dynamodb.conditions.Attr

import awswrangler as wr
from boto3.dynamodb.conditions import Attr
df = wr.dynamodb.read_items(
    table_name='my-table',
    filter_expression=Attr('my_attr').eq('this-value')
)

Same as above, but with FilterExpression as string

import awswrangler as wr
df = wr.dynamodb.read_items(
    table_name='my-table',
    filter_expression='my_attr = :v',
    expression_attribute_values={':v': 'this-value'}
)

Reading items involving an attribute which collides with DynamoDB reserved keywords

import awswrangler as wr
df = wr.dynamodb.read_items(
    table_name='my-table',
    filter_expression='#operator = :v',
    expression_attribute_names={'#operator': 'operator'},
    expression_attribute_values={':v': 'this-value'}
)

Package versions

pandas==1.5.2
botocore==1.29.16
boto3==1.26.16
awswrangler==2.17.0

and Python 3.10.8.

@jaidisido
Copy link
Contributor

This is really fantastic work @a-slice-of-py and yes we would be very keen to include this in the library. May I ask you to create a PR with the above and we can discuss the changes there

@a-slice-of-py
Copy link
Contributor Author

a-slice-of-py commented Dec 15, 2022

Hi @jaidisido, glad you find it valuable!

Regarding the PR:

  • I read in the contributing guidelines that I must ensure local tests pass, but I am not sure what it means regarding multiple tests environment: can you please clarify?
  • regarding the automated CodeBuild triggered with the PR, should I skip it with the prefix [skip-ci] in the commit message, or is preferred to let it run?

@jaidisido
Copy link
Contributor

jaidisido commented Dec 15, 2022

I wouldn't worry too much about that. All our unit tests are triggered when a PR is opened against main anyways. I think the only benefit to running your additional tests locally is that you don't have to wait for our CodeBuild to run all of them (takes 20 min) before you know if they passed, so the feedback loop would be faster. But again optional and we can work on it in the PR bit by bit.

We recommend adding the [skip-ci] prefix to your commit if you wish to skip the tests altogether. One reason some people add it is if they know beforehand that their commit will fail the tests (so there is no point in running them) or if the change is minor/unrelated (e.g. a doc change). Otherwise it's ok to run them.

a-slice-of-py pushed a commit to a-slice-of-py/aws-sdk-pandas that referenced this issue Dec 16, 2022
jaidisido added a commit that referenced this issue Dec 29, 2022
* feat: add read_items to dynamodb module (#1867)

Co-authored-by: Silvio Lugaro <[email protected]>
Co-authored-by: jaidisido <[email protected]>
Co-authored-by: Lucas Hanson <[email protected]>
Co-authored-by: kukushking <[email protected]>
@jaidisido
Copy link
Contributor

Now merged and will be available in the next release, thanks again

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants