Skip to content

Commit de0b73f

Browse files
committed
fix tests and clean up a few parts of the code
1 parent a3b3563 commit de0b73f

File tree

4 files changed

+72
-9
lines changed

4 files changed

+72
-9
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -806,9 +806,11 @@ def _init_mappings(self) -> None:
806806
def _create_stream_name_to_configured_stream(
807807
configured_catalog: Optional[ConfiguredAirbyteCatalog],
808808
) -> Mapping[str, ConfiguredAirbyteStream]:
809-
if configured_catalog is None:
810-
return {}
811-
return {stream.stream.name: stream for stream in configured_catalog.streams}
809+
return (
810+
{stream.stream.name: stream for stream in configured_catalog.streams}
811+
if configured_catalog
812+
else {}
813+
)
812814

813815
def create_component(
814816
self,

airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,21 @@ def get_request_property_chunks(
3939
:param configured_stream: The customer configured stream being synced which is needed to identify which
4040
record fields to query for and emit.
4141
"""
42+
configured_properties = self._get_configured_properties(configured_stream)
43+
4244
fields: Union[Iterable[str], List[str]]
4345
if isinstance(self.property_list, PropertiesFromEndpoint):
4446
fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
4547
else:
4648
fields = self.property_list if self.property_list else []
4749

48-
configured_properties = self._get_configured_properties(configured_stream)
49-
5050
if self.property_chunking:
5151
yield from self.property_chunking.get_request_property_chunks(
5252
property_fields=fields,
5353
always_include_properties=self.always_include_properties,
5454
configured_properties=configured_properties,
5555
)
5656
else:
57-
# A schema might have no extra properties enabled which is valid and represented by an empty set
5857
if configured_properties is not None:
5958
yield from [[field for field in fields if field in configured_properties]]
6059
else:
@@ -64,7 +63,13 @@ def get_request_property_chunks(
6463
def _get_configured_properties(
6564
configured_stream: Optional[ConfiguredAirbyteStream] = None,
6665
) -> Optional[Set[str]]:
66+
"""
67+
Returns the set of properties that have been selected for the configured stream. The intent being that
68+
we should only query for selected properties not all since disabled properties are discarded.
69+
70+
When configured_stream is None, then there was no incoming catalog and all fields should be retrieved.
71+
This is different from the empty set where the json_schema was empty and no schema fields were selected.
72+
"""
6773
if configured_stream:
68-
# todo double check that configured catalog only contains enabled fields
6974
return set(configured_stream.stream.json_schema.get("properties", {}).keys())
7075
return None

unit_tests/connector_builder/test_property_chunking.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,12 @@
186186
"json_schema": {
187187
"$schema": "http://json-schema.org/draft-07/schema#",
188188
"type": "object",
189-
"properties": {},
189+
"properties": {
190+
"one": {"type": ["null", "string"]},
191+
"two": {"type": ["null", "string"]},
192+
"three": {"type": ["null", "string"]},
193+
"four": {"type": ["null", "string"]},
194+
},
190195
},
191196
"supported_sync_modes": ["full_refresh"],
192197
"source_defined_cursor": False,

unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def test_get_request_property_chunks(
7272
property_limit,
7373
expected_property_chunks,
7474
):
75+
configured_properties = set(property_fields)
7576
property_fields = iter(property_fields)
7677
property_chunking = PropertyChunking(
7778
property_limit_type=property_limit_type,
@@ -83,7 +84,9 @@ def test_get_request_property_chunks(
8384

8485
property_chunks = list(
8586
property_chunking.get_request_property_chunks(
86-
property_fields=property_fields, always_include_properties=always_include_properties
87+
property_fields=property_fields,
88+
always_include_properties=always_include_properties,
89+
configured_properties=configured_properties,
8790
)
8891
)
8992

@@ -92,6 +95,54 @@ def test_get_request_property_chunks(
9295
assert property_chunks[i] == expected_property_chunk
9396

9497

98+
def test_get_request_property_chunks_empty_configured_properties():
99+
expected_property_chunks = [["white", "lotus"]]
100+
101+
always_include_properties = ["white", "lotus"]
102+
property_fields = ["maui", "taormina", "koh_samui", "saint_jean_cap_ferrat"]
103+
configured_properties = set()
104+
property_chunking = PropertyChunking(
105+
property_limit_type=PropertyLimitType.property_count,
106+
property_limit=3,
107+
record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}),
108+
config=CONFIG,
109+
parameters={},
110+
)
111+
property_chunks = list(
112+
property_chunking.get_request_property_chunks(
113+
property_fields=property_fields,
114+
always_include_properties=always_include_properties,
115+
configured_properties=configured_properties,
116+
)
117+
)
118+
assert property_chunks == expected_property_chunks
119+
120+
121+
def test_get_request_property_chunks_none_configured_properties():
122+
expected_property_chunks = [
123+
["white", "lotus", "maui", "taormina"],
124+
["white", "lotus", "koh_samui", "saint_jean_cap_ferrat"],
125+
]
126+
127+
always_include_properties = ["white", "lotus"]
128+
property_fields = ["maui", "taormina", "koh_samui", "saint_jean_cap_ferrat"]
129+
property_chunking = PropertyChunking(
130+
property_limit_type=PropertyLimitType.property_count,
131+
property_limit=2,
132+
record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}),
133+
config=CONFIG,
134+
parameters={},
135+
)
136+
property_chunks = list(
137+
property_chunking.get_request_property_chunks(
138+
property_fields=property_fields,
139+
always_include_properties=always_include_properties,
140+
configured_properties=None,
141+
)
142+
)
143+
assert property_chunks == expected_property_chunks
144+
145+
95146
def test_get_merge_key():
96147
record = Record(stream_name="test", data={"id": "0"})
97148
property_chunking = PropertyChunking(

0 commit comments

Comments
 (0)