Skip to content

Commit a3b3563

Browse files
committed
add configured catalog to SimpleRetriever and only fetch properties that are selected in the json_schema
1 parent 035264c commit a3b3563

File tree

7 files changed

+230
-16
lines changed

7 files changed

+230
-16
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def __init__(
169169
component_factory = ModelToComponentFactory(
170170
emit_connector_builder_messages=emit_connector_builder_messages,
171171
message_repository=ConcurrentMessageRepository(queue, message_repository),
172+
configured_catalog=catalog,
172173
connector_state_manager=self._connector_state_manager,
173174
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
174175
limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
get_type_hints,
2727
)
2828

29+
from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
2930
from isodate import parse_duration
3031
from pydantic.v1 import BaseModel
3132
from requests import Response
@@ -42,6 +43,7 @@
4243
AirbyteStateMessage,
4344
AirbyteStateType,
4445
AirbyteStreamState,
46+
ConfiguredAirbyteCatalog,
4547
FailureType,
4648
Level,
4749
StreamDescriptor,
@@ -668,6 +670,7 @@ def __init__(
668670
message_repository: Optional[MessageRepository] = None,
669671
connector_state_manager: Optional[ConnectorStateManager] = None,
670672
max_concurrent_async_job_count: Optional[int] = None,
673+
configured_catalog: Optional[ConfiguredAirbyteCatalog] = None,
671674
):
672675
self._init_mappings()
673676
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -678,6 +681,9 @@ def __init__(
678681
self._message_repository = message_repository or InMemoryMessageRepository(
679682
self._evaluate_log_level(emit_connector_builder_messages)
680683
)
684+
self._stream_name_to_configured_stream = self._create_stream_name_to_configured_stream(
685+
configured_catalog
686+
)
681687
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
682688
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
683689
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
@@ -796,6 +802,14 @@ def _init_mappings(self) -> None:
796802
# Needed for the case where we need to perform a second parse on the fields of a custom component
797803
self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR}
798804

805+
@staticmethod
806+
def _create_stream_name_to_configured_stream(
807+
configured_catalog: Optional[ConfiguredAirbyteCatalog],
808+
) -> Mapping[str, ConfiguredAirbyteStream]:
809+
if configured_catalog is None:
810+
return {}
811+
return {stream.stream.name: stream for stream in configured_catalog.streams}
812+
799813
def create_component(
800814
self,
801815
model_type: Type[BaseModel],
@@ -3302,6 +3316,8 @@ def _get_url(req: Requester) -> str:
33023316
model.ignore_stream_slicer_parameters_on_paginated_requests or False
33033317
)
33043318

3319+
configured_stream = self._stream_name_to_configured_stream.get(name)
3320+
33053321
if (
33063322
model.partition_router
33073323
and isinstance(model.partition_router, SubstreamPartitionRouterModel)
@@ -3337,6 +3353,7 @@ def _get_url(req: Requester) -> str:
33373353
request_option_provider=request_options_provider,
33383354
cursor=None,
33393355
config=config,
3356+
configured_airbyte_stream=configured_stream,
33403357
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33413358
parameters=model.parameters or {},
33423359
)
@@ -3358,6 +3375,7 @@ def _get_url(req: Requester) -> str:
33583375
request_option_provider=request_options_provider,
33593376
cursor=None,
33603377
config=config,
3378+
configured_airbyte_stream=configured_stream,
33613379
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33623380
additional_query_properties=query_properties,
33633381
log_formatter=self._get_log_formatter(log_formatter, name),

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from dataclasses import InitVar, dataclass
44
from enum import Enum
5-
from typing import Any, Iterable, List, Mapping, Optional
5+
from typing import Any, Iterable, List, Mapping, Optional, Set
66

77
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
88
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
@@ -40,7 +40,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4040
)
4141

4242
def get_request_property_chunks(
43-
self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
43+
self,
44+
property_fields: Iterable[str],
45+
always_include_properties: Optional[List[str]],
46+
configured_properties: Optional[Set[str]],
4447
) -> Iterable[List[str]]:
4548
if not self.property_limit:
4649
single_property_chunk = list(property_fields)
@@ -53,6 +56,8 @@ def get_request_property_chunks(
5356
for property_field in property_fields:
5457
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
5558
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
59+
if configured_properties is not None and property_field not in configured_properties:
60+
continue
5661
property_field_size = (
5762
len(property_field)
5863
+ 3 # The +3 represents the extra characters for encoding the delimiter in between properties

airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22

33
from dataclasses import InitVar, dataclass
4-
from typing import Any, Iterable, List, Mapping, Optional, Union
4+
from typing import Any, Iterable, List, Mapping, Optional, Set, Union
55

6+
from airbyte_cdk.models import ConfiguredAirbyteStream
67
from airbyte_cdk.sources.declarative.requesters.query_properties import (
78
PropertiesFromEndpoint,
89
PropertyChunking,
@@ -26,23 +27,44 @@ class QueryProperties:
2627
parameters: InitVar[Mapping[str, Any]]
2728

2829
def get_request_property_chunks(
29-
self, stream_slice: Optional[StreamSlice] = None
30+
self,
31+
stream_slice: Optional[StreamSlice] = None,
32+
configured_stream: Optional[ConfiguredAirbyteStream] = None,
3033
) -> Iterable[List[str]]:
3134
"""
3235
Uses the defined property_list to fetch the total set of properties dynamically or from a static list
3336
and based on the resulting properties, performs property chunking if applicable.
3437
:param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included
3538
because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
39+
:param configured_stream: The customer configured stream being synced which is needed to identify which
40+
record fields to query for and emit.
3641
"""
3742
fields: Union[Iterable[str], List[str]]
3843
if isinstance(self.property_list, PropertiesFromEndpoint):
3944
fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
4045
else:
4146
fields = self.property_list if self.property_list else []
4247

48+
configured_properties = self._get_configured_properties(configured_stream)
49+
4350
if self.property_chunking:
4451
yield from self.property_chunking.get_request_property_chunks(
45-
property_fields=fields, always_include_properties=self.always_include_properties
52+
property_fields=fields,
53+
always_include_properties=self.always_include_properties,
54+
configured_properties=configured_properties,
4655
)
4756
else:
48-
yield list(fields)
57+
# A schema might have no extra properties enabled which is valid and represented by an empty set
58+
if configured_properties is not None:
59+
yield from [[field for field in fields if field in configured_properties]]
60+
else:
61+
yield list(fields)
62+
63+
@staticmethod
64+
def _get_configured_properties(
65+
configured_stream: Optional[ConfiguredAirbyteStream] = None,
66+
) -> Optional[Set[str]]:
67+
if configured_stream:
68+
# todo double check that configured catalog only contains enabled fields
69+
return set(configured_stream.stream.json_schema.get("properties", {}).keys())
70+
return None

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525

2626
from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor
2727
from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
28-
from airbyte_cdk.models import AirbyteMessage
28+
from airbyte_cdk.models import (
29+
AirbyteMessage,
30+
ConfiguredAirbyteStream,
31+
)
2932
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
3033
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
3134
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
@@ -97,6 +100,7 @@ class SimpleRetriever(Retriever):
97100
cursor: Optional[DeclarativeCursor] = None
98101
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
99102
additional_query_properties: Optional[QueryProperties] = None
103+
configured_airbyte_stream: Optional[ConfiguredAirbyteStream] = None
100104
log_formatter: Optional[Callable[[requests.Response], Any]] = None
101105
pagination_tracker_factory: Callable[[], PaginationTracker] = field(
102106
default_factory=lambda: lambda: PaginationTracker()
@@ -389,7 +393,8 @@ def _read_pages(
389393
and self.additional_query_properties.property_chunking
390394
):
391395
for properties in self.additional_query_properties.get_request_property_chunks(
392-
stream_slice=stream_slice
396+
stream_slice=stream_slice,
397+
configured_stream=self.configured_airbyte_stream,
393398
):
394399
stream_slice = StreamSlice(
395400
partition=stream_slice.partition or {},

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@
1212
import freezegun
1313
import pytest
1414
import requests
15+
from airbyte_protocol_dataclasses.models.airbyte_protocol import (
16+
AirbyteStream,
17+
ConfiguredAirbyteCatalog,
18+
ConfiguredAirbyteStream,
19+
DestinationSyncMode,
20+
SyncMode,
21+
)
1522
from freezegun.api import FakeDatetime
1623
from pydantic.v1 import ValidationError
1724

@@ -230,6 +237,28 @@ def test_create_component_type_mismatch():
230237
factory.create_component(CheckStreamModel, manifest["check"], {})
231238

232239

240+
def test_create_component_with_configured_catalog():
241+
configured_catalog = ConfiguredAirbyteCatalog(
242+
streams=[
243+
ConfiguredAirbyteStream(
244+
stream=AirbyteStream(
245+
name="test",
246+
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
247+
supported_sync_modes=[SyncMode.full_refresh],
248+
),
249+
sync_mode=SyncMode.full_refresh,
250+
destination_sync_mode=DestinationSyncMode.overwrite,
251+
)
252+
]
253+
)
254+
255+
factory_with_catalog = ModelToComponentFactory(configured_catalog=configured_catalog)
256+
257+
assert factory_with_catalog._stream_name_to_configured_stream == {
258+
"test": configured_catalog.streams[0]
259+
}
260+
261+
233262
def test_full_config_stream():
234263
content = """
235264
decoder:
@@ -1217,7 +1246,7 @@ def test_stream_with_custom_requester_and_query_properties(requests_mock):
12171246
http_method: "GET"
12181247
request_parameters:
12191248
not_query: 1
1220-
query:
1249+
query:
12211250
type: QueryProperties
12221251
property_list:
12231252
- id

0 commit comments

Comments
 (0)