Skip to content

Commit 55ea028

Browse files
maxi297octavia-squidington-iii
andauthored
fix: hubspot property chunking (#797)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 8423ee4 commit 55ea028

File tree

2 files changed

+87
-10
lines changed

2 files changed

+87
-10
lines changed

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -384,10 +384,7 @@ def _read_pages(
384384

385385
response = None
386386
try:
387-
if (
388-
self.additional_query_properties
389-
and self.additional_query_properties.property_chunking
390-
):
387+
if self.additional_query_properties:
391388
for properties in self.additional_query_properties.get_request_property_chunks(
392389
stream_slice=stream_slice
393390
):
@@ -401,15 +398,19 @@ def _read_pages(
401398
)
402399

403400
for current_record in records_generator_fn(response):
404-
merge_key = (
405-
self.additional_query_properties.property_chunking.get_merge_key(
401+
if self.additional_query_properties.property_chunking:
402+
merge_key = self.additional_query_properties.property_chunking.get_merge_key(
406403
current_record
407404
)
408-
)
409-
if merge_key:
410-
_deep_merge(merged_records[merge_key], current_record)
405+
if merge_key:
406+
_deep_merge(merged_records[merge_key], current_record)
407+
else:
408+
# We should still emit records even if the record did not have a merge key
409+
pagination_tracker.observe(current_record)
410+
last_page_size += 1
411+
last_record = current_record
412+
yield current_record
411413
else:
412-
# We should still emit records even if the record did not have a merge key
413414
pagination_tracker.observe(current_record)
414415
last_page_size += 1
415416
last_record = current_record

unit_tests/sources/declarative/retrievers/test_simple_retriever.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,82 @@ def test_simple_retriever_with_additional_query_properties():
11181118
assert actual_records == expected_records
11191119

11201120

1121+
def test_simple_retriever_with_additional_query_properties_but_without_property_chunking():
1122+
stream_name = "stream_name"
1123+
expected_records = [
1124+
Record(
1125+
data={"id": "a", "field": "value_first_page"},
1126+
associated_slice=None,
1127+
stream_name=stream_name,
1128+
),
1129+
Record(
1130+
data={"id": "b", "field": "value_second_page"},
1131+
associated_slice=None,
1132+
stream_name=stream_name,
1133+
),
1134+
]
1135+
1136+
stream_slice = StreamSlice(cursor_slice={}, partition={})
1137+
1138+
response = requests.Response()
1139+
response.status_code = 200
1140+
response._content = json.dumps({"data": [{"whatever": 1}]}).encode("utf-8")
1141+
1142+
requester = MagicMock()
1143+
requester.send_request.side_effect = [
1144+
response,
1145+
response,
1146+
]
1147+
1148+
record_selector = MagicMock()
1149+
record_selector.select_records.side_effect = [
1150+
[
1151+
Record(
1152+
data={"id": "a", "field": "value_first_page"},
1153+
associated_slice=None,
1154+
stream_name=stream_name,
1155+
),
1156+
],
1157+
[
1158+
Record(
1159+
data={"id": "b", "field": "value_second_page"},
1160+
associated_slice=None,
1161+
stream_name=stream_name,
1162+
),
1163+
],
1164+
]
1165+
1166+
query_properties = QueryProperties(
1167+
property_list=["first_name", "last_name", "nonary", "bracelet"],
1168+
always_include_properties=[],
1169+
property_chunking=None,
1170+
config=config,
1171+
parameters={},
1172+
)
1173+
1174+
paginator = _mock_paginator()
1175+
paginator.next_page_token.side_effect = [{"next_page_token": 1}, None]
1176+
1177+
retriever = SimpleRetriever(
1178+
name=stream_name,
1179+
primary_key=primary_key,
1180+
requester=requester,
1181+
record_selector=record_selector,
1182+
additional_query_properties=query_properties,
1183+
paginator=paginator,
1184+
parameters={},
1185+
config={},
1186+
)
1187+
1188+
actual_records = [
1189+
r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice)
1190+
]
1191+
1192+
assert len(actual_records) == 2
1193+
assert actual_records == expected_records
1194+
assert requester.send_request.call_args_list[0].kwargs["stream_slice"].extra_fields
1195+
1196+
11211197
def test_simple_retriever_with_additional_query_properties_single_chunk():
11221198
stream_name = "stream_name"
11231199
expected_records = [

0 commit comments

Comments
 (0)