Skip to content

chore: run pyupgrade on codebase #483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
  •  
  •  
  •  
9 changes: 5 additions & 4 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
)

import time
from collections.abc import MutableMapping
from copy import copy
from typing import Any, List, MutableMapping
from typing import Any

import orjson

Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(
non_observed_mapping[item] = ObservedDict(value, observer)

# Observe nested list of dicts
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, observer)
Expand All @@ -52,11 +53,11 @@ def __setitem__(self, item: Any, value: Any) -> None:
previous_value = self.get(item)
if isinstance(value, MutableMapping):
value = ObservedDict(value, self.observer)
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, self.observer)
super(ObservedDict, self).__setitem__(item, value)
super().__setitem__(item, value)
if self.update_on_unchanged_value or value != previous_value:
self.observer.update()

Expand Down
7 changes: 4 additions & 3 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar
from collections.abc import Mapping
from typing import Any, Generic, Protocol, TypeVar

import yaml

Expand All @@ -19,7 +20,7 @@
)


def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
def load_optional_package_file(package: str, filename: str) -> bytes | None:
"""Gets a resource from a package, returning None if it does not exist"""
try:
return pkgutil.get_data(package, filename)
Expand Down Expand Up @@ -52,7 +53,7 @@ def read_config(config_path: str) -> Mapping[str, Any]:

@staticmethod
def _read_json_file(file_path: str) -> Any:
with open(file_path, "r") as file:
with open(file_path) as file:
contents = file.read()

try:
Expand Down
7 changes: 4 additions & 3 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#


from collections.abc import Mapping
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, List, Mapping
from typing import Any

from airbyte_cdk.connector_builder.test_reader import TestReader
from airbyte_cdk.models import (
Expand Down Expand Up @@ -74,7 +75,7 @@ def read_stream(
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
state: list[AirbyteStateMessage],
limits: TestLimits,
) -> AirbyteMessage:
try:
Expand Down Expand Up @@ -128,7 +129,7 @@ def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits)
for stream in streams:
stream["dynamic_stream_name"] = None

mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
mapped_streams: dict[str, list[dict[str, Any]]] = {}
for stream in source.dynamic_streams:
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])

Expand Down
13 changes: 7 additions & 6 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@


import sys
from typing import Any, List, Mapping, Optional, Tuple
from collections.abc import Mapping
from typing import Any

import orjson

Expand All @@ -31,8 +32,8 @@


def get_config_and_catalog_from_args(
args: List[str],
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
args: list[str],
) -> tuple[str, Mapping[str, Any], ConfiguredAirbyteCatalog | None, Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
Expand Down Expand Up @@ -71,8 +72,8 @@ def handle_connector_builder_request(
source: ManifestDeclarativeSource,
command: str,
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
state: List[AirbyteStateMessage],
catalog: ConfiguredAirbyteCatalog | None,
state: list[AirbyteStateMessage],
limits: TestLimits,
) -> AirbyteMessage:
if command == "resolve_manifest":
Expand All @@ -88,7 +89,7 @@ def handle_connector_builder_request(
raise ValueError(f"Unrecognized command {command}.")


def handle_request(args: List[str]) -> str:
def handle_request(args: list[str]) -> str:
command, config, catalog, state = get_config_and_catalog_from_args(args)
limits = get_limits(config)
source = create_source(config, limits)
Expand Down
50 changes: 25 additions & 25 deletions airbyte_cdk/connector_builder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,30 @@
#

from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from typing import Any


@dataclass
class HttpResponse:
status: int
body: Optional[str] = None
headers: Optional[Dict[str, Any]] = None
body: str | None = None
headers: dict[str, Any] | None = None


@dataclass
class HttpRequest:
url: str
headers: Optional[Dict[str, Any]]
headers: dict[str, Any] | None
http_method: str
body: Optional[str] = None
body: str | None = None


@dataclass
class LogMessage:
message: str
level: str
internal_message: Optional[str] = None
stacktrace: Optional[str] = None
internal_message: str | None = None
stacktrace: str | None = None


@dataclass
Expand All @@ -40,34 +40,34 @@ class AuxiliaryRequest:

@dataclass
class StreamReadPages:
records: List[object]
request: Optional[HttpRequest] = None
response: Optional[HttpResponse] = None
records: list[object]
request: HttpRequest | None = None
response: HttpResponse | None = None


@dataclass
class StreamReadSlices:
pages: List[StreamReadPages]
slice_descriptor: Optional[Dict[str, Any]]
state: Optional[List[Dict[str, Any]]] = None
auxiliary_requests: Optional[List[AuxiliaryRequest]] = None
pages: list[StreamReadPages]
slice_descriptor: dict[str, Any] | None
state: list[dict[str, Any]] | None = None
auxiliary_requests: list[AuxiliaryRequest] | None = None


@dataclass
class StreamRead(object):
logs: List[LogMessage]
slices: List[StreamReadSlices]
class StreamRead:
logs: list[LogMessage]
slices: list[StreamReadSlices]
test_read_limit_reached: bool
auxiliary_requests: List[AuxiliaryRequest]
inferred_schema: Optional[Dict[str, Any]]
inferred_datetime_formats: Optional[Dict[str, str]]
latest_config_update: Optional[Dict[str, Any]]
auxiliary_requests: list[AuxiliaryRequest]
inferred_schema: dict[str, Any] | None
inferred_datetime_formats: dict[str, str] | None
latest_config_update: dict[str, Any] | None


@dataclass
class StreamReadRequestBody:
manifest: Dict[str, Any]
manifest: dict[str, Any]
stream: str
config: Dict[str, Any]
state: Optional[Dict[str, Any]]
record_limit: Optional[int]
config: dict[str, Any]
state: dict[str, Any] | None
record_limit: int | None
51 changes: 26 additions & 25 deletions airbyte_cdk/connector_builder/test_reader/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
#

import json
from collections.abc import Mapping
from copy import deepcopy
from json import JSONDecodeError
from typing import Any, Dict, List, Mapping, Optional
from typing import Any

from airbyte_cdk.connector_builder.models import (
AuxiliaryRequest,
Expand Down Expand Up @@ -35,7 +36,7 @@
# -------


def airbyte_message_to_json(message: AirbyteMessage) -> Optional[Dict[str, JsonType]]:
def airbyte_message_to_json(message: AirbyteMessage) -> dict[str, JsonType] | None:
"""
Converts an AirbyteMessage to a JSON dictionary if its type is LOG.

Expand Down Expand Up @@ -64,7 +65,7 @@ def airbyte_message_to_json(message: AirbyteMessage) -> Optional[Dict[str, JsonT
return None


def clean_config(config: Dict[str, Any]) -> Dict[str, Any]:
def clean_config(config: dict[str, Any]) -> dict[str, Any]:
"""
Cleans the configuration dictionary by removing all keys that start with a double underscore.

Expand All @@ -85,7 +86,7 @@ def clean_config(config: Dict[str, Any]) -> Dict[str, Any]:
return cleaned_config


def create_request_from_log_message(json_http_message: Dict[str, Any]) -> HttpRequest:
def create_request_from_log_message(json_http_message: dict[str, Any]) -> HttpRequest:
"""
Creates an HttpRequest object from the provided JSON-formatted log message.

Expand Down Expand Up @@ -129,7 +130,7 @@ def create_request_from_log_message(json_http_message: Dict[str, Any]) -> HttpRe
)


def create_response_from_log_message(json_http_message: Dict[str, Any]) -> HttpResponse:
def create_response_from_log_message(json_http_message: dict[str, Any]) -> HttpResponse:
"""
Generate an HttpResponse instance from a JSON log message containing HTTP response details.

Expand Down Expand Up @@ -174,7 +175,7 @@ def parse_json(log_message: AirbyteLogMessage) -> JsonType:
return None


def parse_slice_description(log_message: str) -> Dict[str, Any]:
def parse_slice_description(log_message: str) -> dict[str, Any]:
"""
Parses a log message containing a JSON payload and returns it as a dictionary.

Expand Down Expand Up @@ -203,7 +204,7 @@ def parse_slice_description(log_message: str) -> Dict[str, Any]:
def should_close_page(
at_least_one_page_in_group: bool,
message: AirbyteMessage,
json_message: Optional[Dict[str, Any]],
json_message: dict[str, Any] | None,
) -> bool:
"""
Determines whether a page should be closed based on its content and state.
Expand Down Expand Up @@ -269,7 +270,7 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby
return at_least_one_page_in_group and should_process_slice_descriptor(message)


def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool:
def is_page_http_request(json_message: dict[str, Any] | None) -> bool:
"""
Determines whether a given JSON message represents a page HTTP request.

Expand All @@ -291,7 +292,7 @@ def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool:
return is_http_log(json_message) and not is_auxiliary_http_request(json_message)


def is_http_log(message: Dict[str, JsonType]) -> bool:
def is_http_log(message: dict[str, JsonType]) -> bool:
"""
Determine if the provided log message represents an HTTP log.

Expand All @@ -308,7 +309,7 @@ def is_http_log(message: Dict[str, JsonType]) -> bool:
return bool(message.get("http", False))


def is_auxiliary_http_request(message: Optional[Dict[str, Any]]) -> bool:
def is_auxiliary_http_request(message: dict[str, Any] | None) -> bool:
"""
Determines if the provided message represents an auxiliary HTTP request.

Expand Down Expand Up @@ -415,10 +416,10 @@ def is_state_message(message: AirbyteMessage) -> bool:


def handle_current_slice(
current_slice_pages: List[StreamReadPages],
current_slice_descriptor: Optional[Dict[str, Any]] = None,
latest_state_message: Optional[Dict[str, Any]] = None,
auxiliary_requests: Optional[List[AuxiliaryRequest]] = None,
current_slice_pages: list[StreamReadPages],
current_slice_descriptor: dict[str, Any] | None = None,
latest_state_message: dict[str, Any] | None = None,
auxiliary_requests: list[AuxiliaryRequest] | None = None,
) -> StreamReadSlices:
"""
Handles the current slice by packaging its pages, descriptor, and state into a StreamReadSlices instance.
Expand All @@ -441,10 +442,10 @@ def handle_current_slice(


def handle_current_page(
current_page_request: Optional[HttpRequest],
current_page_response: Optional[HttpResponse],
current_slice_pages: List[StreamReadPages],
current_page_records: List[Mapping[str, Any]],
current_page_request: HttpRequest | None,
current_page_response: HttpResponse | None,
current_slice_pages: list[StreamReadPages],
current_page_records: list[Mapping[str, Any]],
) -> tuple[None, None]:
"""
Closes the current page by appending its request, response, and records
Expand Down Expand Up @@ -472,7 +473,7 @@ def handle_current_page(
return None, None


def handle_auxiliary_request(json_message: Dict[str, JsonType]) -> AuxiliaryRequest:
def handle_auxiliary_request(json_message: dict[str, JsonType]) -> AuxiliaryRequest:
"""
Parses the provided JSON message and constructs an AuxiliaryRequest object by extracting
relevant fields from nested dictionaries.
Expand Down Expand Up @@ -517,10 +518,10 @@ def handle_auxiliary_request(json_message: Dict[str, JsonType]) -> AuxiliaryRequ

def handle_log_message(
message: AirbyteMessage,
json_message: Dict[str, JsonType] | None,
json_message: dict[str, JsonType] | None,
at_least_one_page_in_group: bool,
current_page_request: Optional[HttpRequest],
current_page_response: Optional[HttpResponse],
current_page_request: HttpRequest | None,
current_page_response: HttpResponse | None,
) -> LOG_MESSAGES_OUTPUT_TYPE:
"""
Process a log message by handling both HTTP-specific and auxiliary log entries.
Expand Down Expand Up @@ -571,7 +572,7 @@ def handle_record_message(
schema_inferrer: SchemaInferrer,
datetime_format_inferrer: DatetimeFormatInferrer,
records_count: int,
current_page_records: List[Mapping[str, Any]],
current_page_records: list[Mapping[str, Any]],
) -> int:
"""
Processes an Airbyte record message by updating the current batch and accumulating schema and datetime format information.
Expand Down Expand Up @@ -600,7 +601,7 @@ def handle_record_message(
# -------


def get_airbyte_cdk_from_message(json_message: Dict[str, JsonType]) -> dict: # type: ignore
def get_airbyte_cdk_from_message(json_message: dict[str, JsonType]) -> dict: # type: ignore
"""
Retrieves the "airbyte_cdk" dictionary from the provided JSON message.

Expand Down Expand Up @@ -658,7 +659,7 @@ def get_auxiliary_request_title_prefix(stream: dict) -> str: # type: ignore
return "Parent stream: " if stream.get("is_substream", False) else ""


def get_http_property_from_message(json_message: Dict[str, JsonType]) -> dict: # type: ignore
def get_http_property_from_message(json_message: dict[str, JsonType]) -> dict: # type: ignore
"""
Retrieves the "http" dictionary from the provided JSON message.

Expand Down
Loading
Loading