Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import uuid
import re
import binascii
from typing import Dict, Any, List, Mapping, Optional, Sequence, Union, Tuple, TYPE_CHECKING
from typing import Any, Mapping, Optional, Sequence, Union, Tuple, TYPE_CHECKING

from urllib.parse import quote as urllib_quote
from urllib.parse import unquote as urllib_unquote
Expand Down Expand Up @@ -79,7 +79,7 @@
_VALID_COSMOS_RESOURCE = re.compile(r"^[^/\\#?\t\r\n]*$")


def _get_match_headers(kwargs: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
def _get_match_headers(kwargs: dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
if_match = kwargs.pop('if_match', None)
if_none_match = kwargs.pop('if_none_match', None)
match_condition = kwargs.pop('match_condition', None)
Expand All @@ -104,7 +104,7 @@ def _get_match_headers(kwargs: Dict[str, Any]) -> Tuple[Optional[str], Optional[
return if_match, if_none_match


def build_options(kwargs: Dict[str, Any]) -> Dict[str, Any]:
def build_options(kwargs: dict[str, Any]) -> dict[str, Any]:
options = kwargs.pop('request_options', kwargs.pop('feed_options', {}))
for key, value in _COMMON_OPTIONS.items():
if key in kwargs:
Expand All @@ -128,7 +128,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
options: Mapping[str, Any],
partition_key_range_id: Optional[str] = None,
client_id: Optional[str] = None,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Gets HTTP request headers.

:param _cosmos_client_connection.CosmosClientConnection cosmos_client_connection:
Expand Down Expand Up @@ -712,7 +712,7 @@ def TrimBeginningAndEndingSlashes(path: str) -> str:


# Parses the paths into a list of token each representing a property
def ParsePaths(paths: List[str]) -> List[str]:
def ParsePaths(paths: list[str]) -> list[str]:
segmentSeparator = "/"
tokens = []
for path in paths:
Expand Down Expand Up @@ -791,7 +791,7 @@ def _validate_resource(resource: Mapping[str, Any]) -> None:


def _stringify_auto_scale(offer: ThroughputProperties) -> str:
auto_scale_params: Optional[Dict[str, Union[None, int, Dict[str, Any]]]] = None
auto_scale_params: Optional[dict[str, Union[None, int, dict[str, Any]]]] = None
max_throughput = offer.auto_scale_max_throughput
increment_percent = offer.auto_scale_increment_percent
auto_scale_params = {"maxThroughput": max_throughput}
Expand All @@ -801,7 +801,7 @@ def _stringify_auto_scale(offer: ThroughputProperties) -> str:
return auto_scale_settings


def _set_throughput_options(offer: Optional[Union[int, ThroughputProperties]], request_options: Dict[str, Any]) -> None:
def _set_throughput_options(offer: Optional[Union[int, ThroughputProperties]], request_options: dict[str, Any]) -> None:
if isinstance(offer, int):
request_options["offerThroughput"] = offer
elif offer is not None:
Expand All @@ -820,9 +820,9 @@ def _set_throughput_options(offer: Optional[Union[int, ThroughputProperties]], r
raise TypeError("offer_throughput must be int or an instance of ThroughputProperties") from e


def _deserialize_throughput(throughput: List[Dict[str, Dict[str, Any]]]) -> ThroughputProperties:
def _deserialize_throughput(throughput: list[dict[str, dict[str, Any]]]) -> ThroughputProperties:
properties = throughput[0]
offer_autopilot: Optional[Dict[str, Any]] = properties['content'].get('offerAutopilotSettings')
offer_autopilot: Optional[dict[str, Any]] = properties['content'].get('offerAutopilotSettings')
if offer_autopilot and 'autoUpgradePolicy' in offer_autopilot:
return ThroughputProperties(
properties=properties,
Expand All @@ -842,7 +842,7 @@ def _deserialize_throughput(throughput: List[Dict[str, Dict[str, Any]]]) -> Thro

def _replace_throughput(
throughput: Union[int, ThroughputProperties],
new_throughput_properties: Dict[str, Any]
new_throughput_properties: dict[str, Any]
) -> None:
if isinstance(throughput, int):
new_throughput_properties["content"]["offerThroughput"] = throughput
Expand Down Expand Up @@ -873,15 +873,15 @@ def _internal_resourcetype(resource_type: str) -> str:
return resource_type


def _populate_batch_headers(current_headers: Dict[str, Any]) -> None:
def _populate_batch_headers(current_headers: dict[str, Any]) -> None:
current_headers[http_constants.HttpHeaders.IsBatchRequest] = True
current_headers[http_constants.HttpHeaders.IsBatchAtomic] = True
current_headers[http_constants.HttpHeaders.ShouldBatchContinueOnError] = False


def _format_batch_operations(
operations: Sequence[Union[Tuple[str, Tuple[Any, ...]], Tuple[str, Tuple[Any, ...], Dict[str, Any]]]]
) -> List[Dict[str, Any]]:
operations: Sequence[Union[Tuple[str, Tuple[Any, ...]], Tuple[str, Tuple[Any, ...], dict[str, Any]]]]
) -> list[dict[str, Any]]:
final_operations = []
for index, batch_operation in enumerate(operations):
try:
Expand Down Expand Up @@ -936,7 +936,7 @@ def _format_batch_operations(
return final_operations


def _build_properties_cache(properties: Dict[str, Any], container_link: str) -> Dict[str, Any]:
def _build_properties_cache(properties: dict[str, Any], container_link: str) -> dict[str, Any]:
return {
"_self": properties.get("_self", None), "_rid": properties.get("_rid", None),
"partitionKey": properties.get("partitionKey", None), "container_link": container_link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import base64
import json
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Callable, Tuple, Awaitable, cast
from typing import Any, Callable, Tuple, Awaitable, cast

from azure.cosmos import http_constants, exceptions
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromType
Expand All @@ -38,7 +38,7 @@
class ChangeFeedFetcher(ABC):

@abstractmethod
async def fetch_next_block(self) -> List[Dict[str, Any]]:
async def fetch_next_block(self) -> list[dict[str, Any]]:
pass

class ChangeFeedFetcherV1(ChangeFeedFetcher):
Expand All @@ -51,8 +51,8 @@ def __init__(
self,
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function: Callable[[Dict[str, Any]], Awaitable[Tuple[List[Dict[str, Any]], Dict[str, Any]]]]
feed_options: dict[str, Any],
fetch_function: Callable[[dict[str, Any]], Awaitable[Tuple[list[dict[str, Any]], dict[str, Any]]]]
) -> None:

self._client = client
Expand All @@ -66,7 +66,7 @@ def __init__(
self._resource_link = resource_link
self._fetch_function = fetch_function

async def fetch_next_block(self) -> List[Dict[str, Any]]:
async def fetch_next_block(self) -> list[dict[str, Any]]:
"""Returns a block of results.

:return: List of results.
Expand All @@ -77,7 +77,7 @@ async def callback():

return await _retry_utility_async.ExecuteAsync(self._client, self._client._global_endpoint_manager, callback)

async def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
async def fetch_change_feed_items(self) -> list[dict[str, Any]]:
self._feed_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(self._feed_options)
Expand Down Expand Up @@ -112,8 +112,8 @@ def __init__(
self,
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function: Callable[[Dict[str, Any]], Awaitable[Tuple[List[Dict[str, Any]], Dict[str, Any]]]]
feed_options: dict[str, Any],
fetch_function: Callable[[dict[str, Any]], Awaitable[Tuple[list[dict[str, Any]], dict[str, Any]]]]
) -> None:

self._client = client
Expand All @@ -127,7 +127,7 @@ def __init__(
self._resource_link = resource_link
self._fetch_function = fetch_function

async def fetch_next_block(self) -> List[Dict[str, Any]]:
async def fetch_next_block(self) -> list[dict[str, Any]]:
"""Returns a block of results.

:return: List of results.
Expand Down Expand Up @@ -155,7 +155,7 @@ async def callback():

return await self.fetch_next_block()

async def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
async def fetch_change_feed_items(self) -> list[dict[str, Any]]:
self._feed_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(self._feed_options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Iterable change feed results in the Azure Cosmos database service.
"""
from typing import Dict, Any, Optional, Callable, Tuple, List, Awaitable, Union
from typing import Any, Optional, Callable, Tuple, Awaitable, Union

from azure.core.async_paging import AsyncPageIterator

Expand All @@ -40,8 +40,8 @@ class ChangeFeedIterable(AsyncPageIterator):
def __init__(
self,
client,
options: Dict[str, Any],
fetch_function=Optional[Callable[[Dict[str, Any]], Awaitable[Tuple[List[Dict[str, Any]], Dict[str, Any]]]]],
options: dict[str, Any],
fetch_function=Optional[Callable[[dict[str, Any]], Awaitable[Tuple[list[dict[str, Any]], dict[str, Any]]]]],
collection_link=Optional[str],
continuation_token=Optional[str],
) -> None:
Expand Down Expand Up @@ -92,8 +92,8 @@ def __init__(

async def _unpack(
self,
block: List[Dict[str, Any]]
) -> Tuple[Optional[str], List[Dict[str, Any]]]:
block: list[dict[str, Any]]
) -> Tuple[Optional[str], list[dict[str, Any]]]:
continuation: Optional[str] = None
if self._client.last_response_headers:
continuation = self._client.last_response_headers.get('etag')
Expand All @@ -102,7 +102,7 @@ async def _unpack(
self._did_a_call_already = False
return continuation, block

async def _fetch_next(self, *args) -> List[Dict[str, Any]]: # pylint: disable=unused-argument
async def _fetch_next(self, *args) -> list[dict[str, Any]]: # pylint: disable=unused-argument
"""Return a block of results with respecting retry policy.

:param Any args:
Expand Down Expand Up @@ -145,7 +145,7 @@ async def _initialize_change_feed_fetcher(self) -> None:
self._fetch_function
)

def _validate_change_feed_state_context(self, change_feed_state_context: Dict[str, Any]) -> None:
def _validate_change_feed_state_context(self, change_feed_state_context: dict[str, Any]) -> None:

if change_feed_state_context.get("continuationPkRangeId") is not None:
# if continuation token is in v1 format, throw exception if feed_range is set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import base64
import json
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Callable, Tuple, cast
from typing import Any, Callable, Tuple, cast

from azure.cosmos import _retry_utility, http_constants, exceptions
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromType
Expand All @@ -50,8 +50,8 @@ def __init__(
self,
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function: Callable[[Dict[str, Any]], Tuple[List[Dict[str, Any]], Dict[str, Any]]]
feed_options: dict[str, Any],
fetch_function: Callable[[dict[str, Any]], Tuple[list[dict[str, Any]], dict[str, Any]]]
) -> None:

self._client = client
Expand All @@ -65,7 +65,7 @@ def __init__(
self._resource_link = resource_link
self._fetch_function = fetch_function

def fetch_next_block(self) -> List[Dict[str, Any]]:
def fetch_next_block(self) -> list[dict[str, Any]]:
"""Returns a block of results.

:return: List of results.
Expand All @@ -76,7 +76,7 @@ def callback():

return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback)

def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
def fetch_change_feed_items(self) -> list[dict[str, Any]]:
self._feed_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(self._feed_options)
Expand Down Expand Up @@ -111,8 +111,8 @@ def __init__(
self,
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function: Callable[[Dict[str, Any]], Tuple[List[Dict[str, Any]], Dict[str, Any]]]):
feed_options: dict[str, Any],
fetch_function: Callable[[dict[str, Any]], Tuple[list[dict[str, Any]], dict[str, Any]]]):

self._client = client
self._feed_options = feed_options
Expand All @@ -125,7 +125,7 @@ def __init__(
self._resource_link = resource_link
self._fetch_function = fetch_function

def fetch_next_block(self) -> List[Dict[str, Any]]:
def fetch_next_block(self) -> list[dict[str, Any]]:
"""Returns a block of results.

:return: List of results.
Expand All @@ -150,7 +150,7 @@ def callback():

return self.fetch_next_block()

def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
def fetch_change_feed_items(self) -> list[dict[str, Any]]:
self._feed_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(self._feed_options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Iterable change feed results in the Azure Cosmos database service.
"""
from typing import Dict, Any, Tuple, List, Optional, Callable, cast, Union
from typing import Any, Tuple, Optional, Callable, cast, Union

from azure.core.paging import PageIterator

Expand All @@ -38,8 +38,8 @@ class ChangeFeedIterable(PageIterator):
def __init__(
self,
client,
options: Dict[str, Any],
fetch_function=Optional[Callable[[Dict[str, Any]], Tuple[List[Dict[str, Any]], Dict[str, Any]]]],
options: dict[str, Any],
fetch_function=Optional[Callable[[dict[str, Any]], Tuple[list[dict[str, Any]], dict[str, Any]]]],
collection_link=Optional[str],
continuation_token=Optional[str],
) -> None:
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(
self._unpack, # type: ignore[arg-type]
continuation_token=continuation_token)

def _unpack(self, block: List[Dict[str, Any]]) -> Tuple[Optional[str], List[Dict[str, Any]]]:
def _unpack(self, block: list[dict[str, Any]]) -> Tuple[Optional[str], list[dict[str, Any]]]:
continuation: Optional[str] = None
if self._client.last_response_headers:
continuation = self._client.last_response_headers.get('etag')
Expand All @@ -96,7 +96,7 @@ def _unpack(self, block: List[Dict[str, Any]]) -> Tuple[Optional[str], List[Dict
self._did_a_call_already = False
return continuation, block

def _fetch_next(self, *args) -> List[Dict[str, Any]]: # pylint: disable=unused-argument
def _fetch_next(self, *args) -> list[dict[str, Any]]: # pylint: disable=unused-argument
"""Return a block of results with respecting retry policy.

:param Any args:
Expand Down Expand Up @@ -138,7 +138,7 @@ def _initialize_change_feed_fetcher(self) -> None:
self._fetch_function
)

def _validate_change_feed_state_context(self, change_feed_state_context: Dict[str, Any]) -> None:
def _validate_change_feed_state_context(self, change_feed_state_context: dict[str, Any]) -> None:

if change_feed_state_context.get("continuationPkRangeId") is not None:
# if continuation token is in v1 format, throw exception if feed_range is set
Expand Down
Loading