Skip to content

Commit 15d8769

Browse files
committed
introduce a JsonSchemaPropertySelector component to allow for more flexibility to modify selected properties
1 parent de0b73f commit 15d8769

File tree

13 files changed

+443
-59
lines changed

13 files changed

+443
-59
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2520,6 +2520,34 @@ definitions:
25202520
type:
25212521
type: string
25222522
enum: [JsonlDecoder]
2523+
JsonSchemaPropertySelector:
2524+
title: Json Schema Property Selector
2525+
description: When configured, the JSON schema supplied in the catalog containing which columns are selected for the current stream will be used to reduce which query properties will be included in the outbound API request. This can improve the performance of API requests, especially for those requiring multiple requests to get a complete record.
2526+
type: object
2527+
required:
2528+
- type
2529+
properties:
2530+
type:
2531+
type: string
2532+
enum: [JsonSchemaPropertySelector]
2533+
transformations:
2534+
title: Transformations
2535+
description: A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.
2536+
linkable: true
2537+
type: array
2538+
items:
2539+
anyOf:
2540+
- "$ref": "#/definitions/AddFields"
2541+
- "$ref": "#/definitions/RemoveFields"
2542+
- "$ref": "#/definitions/KeysToLower"
2543+
- "$ref": "#/definitions/KeysToSnakeCase"
2544+
- "$ref": "#/definitions/FlattenFields"
2545+
- "$ref": "#/definitions/DpathFlattenFields"
2546+
- "$ref": "#/definitions/KeysReplace"
2547+
- "$ref": "#/definitions/CustomTransformation"
2548+
$parameters:
2549+
type: object
2550+
additionalProperties: true
25232551
KeysToLower:
25242552
title: Keys to Lower Case
25252553
description: A transformation that renames all keys to lower case.
@@ -3410,6 +3438,10 @@ definitions:
34103438
title: Property Chunking
34113439
description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.
34123440
"$ref": "#/definitions/PropertyChunking"
3441+
property_selector:
3442+
title: Property Selector
3443+
description: Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.
3444+
"$ref": "#/definitions/JsonSchemaPropertySelector"
34133445
$parameters:
34143446
type: object
34153447
additionalProperties: true
@@ -3746,7 +3778,7 @@ definitions:
37463778
properties:
37473779
type:
37483780
type: string
3749-
enum: [ PaginationReset ]
3781+
enum: [PaginationReset]
37503782
action:
37513783
type: string
37523784
enum:
@@ -3763,7 +3795,7 @@ definitions:
37633795
properties:
37643796
type:
37653797
type: string
3766-
enum: [ PaginationResetLimits ]
3798+
enum: [PaginationResetLimits]
37673799
number_of_records:
37683800
type: integer
37693801
GzipDecoder:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -2029,6 +2031,29 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel):
20292031
)
20302032

20312033

2034+
class JsonSchemaPropertySelector(BaseModel):
2035+
type: Literal["JsonSchemaPropertySelector"]
2036+
transformations: Optional[
2037+
List[
2038+
Union[
2039+
AddFields,
2040+
RemoveFields,
2041+
KeysToLower,
2042+
KeysToSnakeCase,
2043+
FlattenFields,
2044+
DpathFlattenFields,
2045+
KeysReplace,
2046+
CustomTransformation,
2047+
]
2048+
]
2049+
] = Field(
2050+
None,
2051+
description="A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.",
2052+
title="Transformations",
2053+
)
2054+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2055+
2056+
20322057
class ListPartitionRouter(BaseModel):
20332058
type: Literal["ListPartitionRouter"]
20342059
cursor_field: str = Field(
@@ -2799,6 +2824,11 @@ class QueryProperties(BaseModel):
27992824
description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.",
28002825
title="Property Chunking",
28012826
)
2827+
property_selector: Optional[JsonSchemaPropertySelector] = Field(
2828+
None,
2829+
description="Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.",
2830+
title="Property Selector",
2831+
)
28022832
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
28032833

28042834

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@
316316
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
317317
JsonlDecoder as JsonlDecoderModel,
318318
)
319+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
320+
JsonSchemaPropertySelector as JsonSchemaPropertySelectorModel,
321+
)
319322
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
320323
JwtAuthenticator as JwtAuthenticatorModel,
321324
)
@@ -503,6 +506,9 @@
503506
from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
504507
PropertyLimitType,
505508
)
509+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import (
510+
JsonSchemaPropertySelector,
511+
)
506512
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import (
507513
GroupByKey,
508514
)
@@ -740,6 +746,7 @@ def _init_mappings(self) -> None:
740746
InlineSchemaLoaderModel: self.create_inline_schema_loader,
741747
JsonDecoderModel: self.create_json_decoder,
742748
JsonlDecoderModel: self.create_jsonl_decoder,
749+
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
743750
GzipDecoderModel: self.create_gzip_decoder,
744751
KeysToLowerModel: self.create_keys_to_lower_transformation,
745752
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
@@ -3003,7 +3010,7 @@ def create_property_chunking(
30033010
)
30043011

30053012
def create_query_properties(
3006-
self, model: QueryPropertiesModel, config: Config, **kwargs: Any
3013+
self, model: QueryPropertiesModel, config: Config, *, stream_name: str, **kwargs: Any
30073014
) -> QueryProperties:
30083015
if isinstance(model.property_list, list):
30093016
property_list = model.property_list
@@ -3020,10 +3027,43 @@ def create_query_properties(
30203027
else None
30213028
)
30223029

3030+
property_selector = (
3031+
self._create_component_from_model(
3032+
model=model.property_selector, config=config, stream_name=stream_name, **kwargs
3033+
)
3034+
if model.property_selector
3035+
else None
3036+
)
3037+
30233038
return QueryProperties(
30243039
property_list=property_list,
30253040
always_include_properties=model.always_include_properties,
30263041
property_chunking=property_chunking,
3042+
property_selector=property_selector,
3043+
config=config,
3044+
parameters=model.parameters or {},
3045+
)
3046+
3047+
def create_json_schema_property_selector(
3048+
self,
3049+
model: JsonSchemaPropertySelectorModel,
3050+
config: Config,
3051+
*,
3052+
stream_name: str,
3053+
**kwargs: Any,
3054+
) -> JsonSchemaPropertySelector:
3055+
configured_stream = self._stream_name_to_configured_stream.get(stream_name)
3056+
3057+
transformations = []
3058+
if model.transformations:
3059+
for transformation_model in model.transformations:
3060+
transformations.append(
3061+
self._create_component_from_model(model=transformation_model, config=config)
3062+
)
3063+
3064+
return JsonSchemaPropertySelector(
3065+
configured_stream=configured_stream,
3066+
properties_transformations=transformations,
30273067
config=config,
30283068
parameters=model.parameters or {},
30293069
)
@@ -3251,7 +3291,7 @@ def _get_url(req: Requester) -> str:
32513291

32523292
if len(query_properties_definitions) == 1:
32533293
query_properties = self._create_component_from_model(
3254-
model=query_properties_definitions[0], config=config
3294+
model=query_properties_definitions[0], stream_name=name, config=config
32553295
)
32563296

32573297
# Removes QueryProperties components from the interpolated mappings because it has been designed
@@ -3277,11 +3317,13 @@ def _get_url(req: Requester) -> str:
32773317

32783318
query_properties = self.create_query_properties(
32793319
model=query_properties_definition,
3320+
stream_name=name,
32803321
config=config,
32813322
)
32823323
elif hasattr(model.requester, "query_properties") and model.requester.query_properties:
32833324
query_properties = self.create_query_properties(
32843325
model=model.requester.query_properties,
3326+
stream_name=name,
32853327
config=config,
32863328
)
32873329

@@ -3318,8 +3360,6 @@ def _get_url(req: Requester) -> str:
33183360
model.ignore_stream_slicer_parameters_on_paginated_requests or False
33193361
)
33203362

3321-
configured_stream = self._stream_name_to_configured_stream.get(name)
3322-
33233363
if (
33243364
model.partition_router
33253365
and isinstance(model.partition_router, SubstreamPartitionRouterModel)
@@ -3355,7 +3395,6 @@ def _get_url(req: Requester) -> str:
33553395
request_option_provider=request_options_provider,
33563396
cursor=None,
33573397
config=config,
3358-
configured_airbyte_stream=configured_stream,
33593398
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33603399
parameters=model.parameters or {},
33613400
)
@@ -3377,7 +3416,6 @@ def _get_url(req: Requester) -> str:
33773416
request_option_provider=request_options_provider,
33783417
cursor=None,
33793418
config=config,
3380-
configured_airbyte_stream=configured_stream,
33813419
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33823420
additional_query_properties=query_properties,
33833421
log_formatter=self._get_log_formatter(log_formatter, name),
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from dataclasses import InitVar, dataclass, field
4+
from typing import Any, List, Mapping, Optional, Set
5+
6+
from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
7+
8+
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
9+
from airbyte_cdk.sources.types import Config
10+
11+
12+
@dataclass
13+
class JsonSchemaPropertySelector:
14+
"""
15+
A class that contains a list of transformations to apply to properties.
16+
"""
17+
18+
configured_stream: ConfiguredAirbyteStream
19+
config: Config
20+
parameters: InitVar[Mapping[str, Any]]
21+
properties_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
22+
23+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
24+
self._parameters = parameters
25+
26+
def select(self) -> Set[str]:
27+
properties = set()
28+
for schema_property in self.configured_stream.stream.json_schema.get(
29+
"properties", {}
30+
).keys():
31+
if self.properties_transformations:
32+
for transformation in self.properties_transformations:
33+
transformation.transform(
34+
schema_property, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected
35+
config=self.config,
36+
)
37+
properties.add(schema_property)
38+
return properties
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.json_schema_property_selector import (
4+
JsonSchemaPropertySelector,
5+
)
6+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import (
7+
PropertySelector,
8+
)
9+
10+
__all__ = ["JsonSchemaPropertySelector", "PropertySelector"]
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from dataclasses import InitVar, dataclass, field
4+
from typing import Any, List, Mapping, Optional, Set
5+
6+
from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
7+
8+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import (
9+
PropertySelector,
10+
)
11+
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
12+
from airbyte_cdk.sources.types import Config
13+
14+
15+
@dataclass
16+
class JsonSchemaPropertySelector(PropertySelector):
17+
"""
18+
A class that contains a list of transformations to apply to properties.
19+
"""
20+
21+
config: Config
22+
parameters: InitVar[Mapping[str, Any]]
23+
# For other non-read operations, there is no configured catalog and therefore no schema selection
24+
configured_stream: Optional[ConfiguredAirbyteStream] = None
25+
properties_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
26+
27+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
28+
self._parameters = parameters
29+
30+
def select(self) -> Optional[Set[str]]:
31+
"""
32+
Returns the set of properties that have been selected for the configured stream. The intent being that
33+
we should only query for selected properties not all since disabled properties are discarded.
34+
35+
When configured_stream is None, then there was no incoming catalog and all fields should be retrieved.
36+
This is different from the empty set where the json_schema was empty and no schema fields were selected.
37+
"""
38+
39+
# For CHECK/DISCOVER operations, there is no catalog and therefor no configured stream or selected
40+
# columns. In this case we return None which is interpreted by the QueryProperties component to not
41+
# perform any filtering of schema properties and fetch all of them
42+
if self.configured_stream is None:
43+
return None
44+
45+
schema_properties = self.configured_stream.stream.json_schema.get("properties", {})
46+
if self.properties_transformations:
47+
for transformation in self.properties_transformations:
48+
transformation.transform(
49+
schema_properties, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected
50+
config=self.config,
51+
)
52+
return set(schema_properties.keys())
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from abc import ABC, abstractmethod
4+
from dataclasses import dataclass
5+
from typing import Optional, Set
6+
7+
8+
@dataclass
9+
class PropertySelector(ABC):
10+
"""
11+
Describes the interface for selecting and transforming properties from a configured stream's schema
12+
to determine which properties should be queried from the API.
13+
"""
14+
15+
@abstractmethod
16+
def select(self) -> Optional[Set[str]]:
17+
"""
18+
Selects and returns the set of properties that should be queried from the API based on the
19+
configured stream's schema and any applicable transformations.
20+
21+
Returns:
22+
Set[str]: The set of property names to query
23+
"""
24+
pass

0 commit comments

Comments
 (0)