Skip to content

Commit b052a89

Browse files
committed
make StreamSlicerTestReadDecorator a child class of base StreamSlicer
1 parent ee498b1 commit b052a89

File tree

3 files changed

+170
-65
lines changed

3 files changed

+170
-65
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3244,9 +3244,12 @@ def _get_url() -> str:
32443244

32453245
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
32463246
if self._should_limit_slices_fetched():
3247-
stream_slicer = StreamSlicerTestReadDecorator(
3248-
wrapped_slicer=stream_slicer,
3249-
maximum_number_of_slices=self._limit_slices_fetched or 5,
3247+
stream_slicer = cast(
3248+
StreamSlicer,
3249+
StreamSlicerTestReadDecorator(
3250+
wrapped_slicer=stream_slicer,
3251+
maximum_number_of_slices=self._limit_slices_fetched or 5,
3252+
),
32503253
)
32513254

32523255
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
@@ -3505,9 +3508,12 @@ def _get_job_timeout() -> datetime.timedelta:
35053508

35063509
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
35073510
if self._should_limit_slices_fetched():
3508-
stream_slicer = StreamSlicerTestReadDecorator(
3509-
wrapped_slicer=stream_slicer,
3510-
maximum_number_of_slices=self._limit_slices_fetched or 5,
3511+
stream_slicer = cast(
3512+
StreamSlicer,
3513+
StreamSlicerTestReadDecorator(
3514+
wrapped_slicer=stream_slicer,
3515+
maximum_number_of_slices=self._limit_slices_fetched or 5,
3516+
),
35113517
)
35123518

35133519
creation_requester = self._create_component_from_model(

airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@
66
from itertools import islice
77
from typing import Any, Iterable, Mapping, Optional, Union
88

9+
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
910
from airbyte_cdk.sources.types import StreamSlice, StreamState
1011

11-
from .stream_slicer import StreamSlicer
12-
1312

1413
@dataclass
1514
class StreamSlicerTestReadDecorator(StreamSlicer):
@@ -24,58 +23,6 @@ class StreamSlicerTestReadDecorator(StreamSlicer):
2423
def stream_slices(self) -> Iterable[StreamSlice]:
2524
return islice(self.wrapped_slicer.stream_slices(), self.maximum_number_of_slices)
2625

27-
def get_request_params(
28-
self,
29-
*,
30-
stream_state: Optional[StreamState] = None,
31-
stream_slice: Optional[StreamSlice] = None,
32-
next_page_token: Optional[Mapping[str, Any]] = None,
33-
) -> Mapping[str, Any]:
34-
return self.wrapped_slicer.get_request_params(
35-
stream_state=stream_state,
36-
stream_slice=stream_slice,
37-
next_page_token=next_page_token,
38-
)
39-
40-
def get_request_headers(
41-
self,
42-
*,
43-
stream_state: Optional[StreamState] = None,
44-
stream_slice: Optional[StreamSlice] = None,
45-
next_page_token: Optional[Mapping[str, Any]] = None,
46-
) -> Mapping[str, Any]:
47-
return self.wrapped_slicer.get_request_headers(
48-
stream_state=stream_state,
49-
stream_slice=stream_slice,
50-
next_page_token=next_page_token,
51-
)
52-
53-
def get_request_body_data(
54-
self,
55-
*,
56-
stream_state: Optional[StreamState] = None,
57-
stream_slice: Optional[StreamSlice] = None,
58-
next_page_token: Optional[Mapping[str, Any]] = None,
59-
) -> Union[Mapping[str, Any], str]:
60-
return self.wrapped_slicer.get_request_body_data(
61-
stream_state=stream_state,
62-
stream_slice=stream_slice,
63-
next_page_token=next_page_token,
64-
)
65-
66-
def get_request_body_json(
67-
self,
68-
*,
69-
stream_state: Optional[StreamState] = None,
70-
stream_slice: Optional[StreamSlice] = None,
71-
next_page_token: Optional[Mapping[str, Any]] = None,
72-
) -> Mapping[str, Any]:
73-
return self.wrapped_slicer.get_request_body_json(
74-
stream_state=stream_state,
75-
stream_slice=stream_slice,
76-
next_page_token=next_page_token,
77-
)
78-
7926
def __getattr__(self, name: str) -> Any:
8027
# Delegate everything else to the wrapped object
8128
return getattr(self.wrapped_slicer, name)

unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py

Lines changed: 157 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,42 @@
44

55
from unittest.mock import Mock
66

7-
from airbyte_cdk.sources.declarative.incremental import PerPartitionWithGlobalCursor
8-
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
9-
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
7+
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import (
8+
AsyncJobOrchestrator,
9+
)
10+
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
11+
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
12+
from airbyte_cdk.sources.declarative.incremental import (
13+
CursorFactory,
14+
DatetimeBasedCursor,
1015
GlobalSubstreamCursor,
16+
PerPartitionWithGlobalCursor,
1117
)
18+
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
1219
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import (
1320
StreamSlice,
1421
)
15-
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
22+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
23+
from airbyte_cdk.sources.declarative.models import (
24+
CustomRetriever,
25+
DeclarativeStream,
26+
ParentStreamConfig,
27+
)
28+
from airbyte_cdk.sources.declarative.partition_routers import (
29+
AsyncJobPartitionRouter,
30+
SubstreamPartitionRouter,
31+
)
1632
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
33+
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
34+
SinglePartitionRouter,
35+
)
1736
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
37+
from airbyte_cdk.sources.message import NoopMessageRepository
38+
from unit_tests.sources.declarative.async_job.test_integration import MockAsyncJobRepository
1839

1940
CURSOR_SLICE_FIELD = "cursor slice field"
41+
_NO_LIMIT = 10000
42+
DATE_FORMAT = "%Y-%m-%d"
2043

2144

2245
class MockedCursorBuilder:
@@ -43,7 +66,24 @@ def mocked_partition_router():
4366
return Mock(spec=PartitionRouter)
4467

4568

46-
def test_show_as_wrapped_instance():
69+
def date_time_based_cursor_factory() -> DatetimeBasedCursor:
70+
return DatetimeBasedCursor(
71+
start_datetime=MinMaxDatetime(
72+
datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={}
73+
),
74+
end_datetime=MinMaxDatetime(
75+
datetime="2021-01-05", datetime_format=DATE_FORMAT, parameters={}
76+
),
77+
step="P10Y",
78+
cursor_field=InterpolatedString.create("created_at", parameters={}),
79+
datetime_format=DATE_FORMAT,
80+
cursor_granularity="P1D",
81+
config={},
82+
parameters={},
83+
)
84+
85+
86+
def test_isinstance_global_cursor():
4787
first_partition = {"first_partition_key": "first_partition_value"}
4888
partition_router = mocked_partition_router()
4989
partition_router.stream_slices.return_value = [
@@ -65,7 +105,119 @@ def test_show_as_wrapped_instance():
65105
assert isinstance(wrapped_slicer, GlobalSubstreamCursor)
66106
assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter)
67107
assert not isinstance(wrapped_slicer, PerPartitionWithGlobalCursor)
108+
assert not isinstance(wrapped_slicer, SubstreamPartitionRouter)
68109

69110
assert isinstance(global_cursor, GlobalSubstreamCursor)
70111
assert not isinstance(global_cursor, AsyncJobPartitionRouter)
71112
assert not isinstance(global_cursor, PerPartitionWithGlobalCursor)
113+
assert not isinstance(global_cursor, SubstreamPartitionRouter)
114+
115+
116+
def test_isinstance_global_cursor_aysnc_job_partition_router():
117+
partition_router = AsyncJobPartitionRouter(
118+
stream_slicer=SinglePartitionRouter(parameters={}),
119+
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
120+
MockAsyncJobRepository(),
121+
stream_slices,
122+
JobTracker(_NO_LIMIT),
123+
NoopMessageRepository(),
124+
),
125+
config={},
126+
parameters={},
127+
)
128+
129+
wrapped_slicer = StreamSlicerTestReadDecorator(
130+
wrapped_slicer=partition_router,
131+
maximum_number_of_slices=5,
132+
)
133+
assert isinstance(wrapped_slicer, AsyncJobPartitionRouter)
134+
assert not isinstance(wrapped_slicer, GlobalSubstreamCursor)
135+
assert not isinstance(wrapped_slicer, PerPartitionWithGlobalCursor)
136+
assert not isinstance(wrapped_slicer, SubstreamPartitionRouter)
137+
138+
assert isinstance(partition_router, AsyncJobPartitionRouter)
139+
assert not isinstance(partition_router, GlobalSubstreamCursor)
140+
assert not isinstance(partition_router, PerPartitionWithGlobalCursor)
141+
assert not isinstance(partition_router, SubstreamPartitionRouter)
142+
143+
144+
def test_isinstance_substrea_partition_router():
145+
partition_router = SubstreamPartitionRouter(
146+
config={},
147+
parameters={},
148+
parent_stream_configs=[
149+
ParentStreamConfig(
150+
type="ParentStreamConfig",
151+
parent_key="id",
152+
partition_field="id",
153+
stream=DeclarativeStream(
154+
type="DeclarativeStream",
155+
retriever=CustomRetriever(type="CustomRetriever", class_name="a_class_name"),
156+
),
157+
)
158+
],
159+
)
160+
161+
wrapped_slicer = StreamSlicerTestReadDecorator(
162+
wrapped_slicer=partition_router,
163+
maximum_number_of_slices=5,
164+
)
165+
166+
assert isinstance(wrapped_slicer, SubstreamPartitionRouter)
167+
assert not isinstance(wrapped_slicer, GlobalSubstreamCursor)
168+
assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter)
169+
assert not isinstance(partition_router, PerPartitionWithGlobalCursor)
170+
171+
assert isinstance(partition_router, SubstreamPartitionRouter)
172+
assert not isinstance(partition_router, GlobalSubstreamCursor)
173+
assert not isinstance(partition_router, AsyncJobPartitionRouter)
174+
assert not isinstance(partition_router, PerPartitionWithGlobalCursor)
175+
176+
177+
def test_isinstance_perpartition_with_global_cursor():
178+
partition_router = SubstreamPartitionRouter(
179+
config={},
180+
parameters={},
181+
parent_stream_configs=[
182+
ParentStreamConfig(
183+
type="ParentStreamConfig",
184+
parent_key="id",
185+
partition_field="id",
186+
stream=DeclarativeStream(
187+
type="DeclarativeStream",
188+
retriever=CustomRetriever(type="CustomRetriever", class_name="a_class_name"),
189+
),
190+
)
191+
],
192+
)
193+
date_time_based_cursor = date_time_based_cursor_factory()
194+
195+
cursor_factory = CursorFactory(date_time_based_cursor_factory)
196+
substream_cursor = PerPartitionWithGlobalCursor(
197+
cursor_factory=cursor_factory,
198+
partition_router=partition_router,
199+
stream_cursor=date_time_based_cursor,
200+
)
201+
202+
wrapped_slicer = StreamSlicerTestReadDecorator(
203+
wrapped_slicer=substream_cursor,
204+
maximum_number_of_slices=5,
205+
)
206+
207+
assert isinstance(wrapped_slicer, PerPartitionWithGlobalCursor)
208+
assert not isinstance(wrapped_slicer, GlobalSubstreamCursor)
209+
assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter)
210+
assert not isinstance(wrapped_slicer, SubstreamPartitionRouter)
211+
assert wrapped_slicer._per_partition_cursor._cursor_factory == cursor_factory
212+
assert wrapped_slicer._partition_router == partition_router
213+
assert wrapped_slicer._global_cursor._stream_cursor == date_time_based_cursor
214+
215+
assert isinstance(substream_cursor, PerPartitionWithGlobalCursor)
216+
assert not isinstance(substream_cursor, GlobalSubstreamCursor)
217+
assert not isinstance(substream_cursor, AsyncJobPartitionRouter)
218+
assert not isinstance(substream_cursor, SubstreamPartitionRouter)
219+
assert substream_cursor._per_partition_cursor._cursor_factory == cursor_factory
220+
assert substream_cursor._partition_router == partition_router
221+
assert substream_cursor._global_cursor._stream_cursor == date_time_based_cursor
222+
223+
assert substream_cursor._get_active_cursor() == wrapped_slicer._get_active_cursor()

0 commit comments

Comments
 (0)