Skip to content

Commit b4a5fec

Browse files
committed
have declarative availability check support AbstractStream
1 parent 2d1e2f4 commit b4a5fec

File tree

7 files changed

+174
-25
lines changed

7 files changed

+174
-25
lines changed

airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
#
44

55
import logging
6-
import traceback
76
from dataclasses import InitVar, dataclass
8-
from typing import Any, List, Mapping, Tuple
7+
from typing import Any, List, Mapping, Tuple, Union
98

10-
from airbyte_cdk import AbstractSource
9+
from airbyte_cdk.sources.abstract_source import AbstractSource
10+
from airbyte_cdk.sources.declarative.checks.check_stream import evaluate_availability
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
12+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1213
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1314

1415

@@ -34,20 +35,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3435
def check_connection(
3536
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
3637
) -> Tuple[bool, Any]:
37-
streams = source.streams(config=config)
38+
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
3839

3940
if len(streams) == 0:
4041
return False, f"No streams to connect to from source {source}"
4142
if not self.use_check_availability:
4243
return True, None
4344

44-
availability_strategy = HttpAvailabilityStrategy()
45-
4645
try:
4746
for stream in streams[: min(self.stream_count, len(streams))]:
48-
stream_is_available, reason = availability_strategy.check_availability(
49-
stream, logger
50-
)
47+
stream_is_available, reason = evaluate_availability(stream, logger)
5148
if not stream_is_available:
5249
logger.warning(f"Stream {stream.name} is not available: {reason}")
5350
return False, reason

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,28 @@
55
import logging
66
import traceback
77
from dataclasses import InitVar, dataclass
8-
from typing import Any, Dict, List, Mapping, Optional, Tuple
8+
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
99

10-
from airbyte_cdk import AbstractSource
10+
from airbyte_cdk.sources.streams.core import Stream
11+
from airbyte_cdk.sources.abstract_source import AbstractSource
1112
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
13+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1214
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1315

1416

17+
def evaluate_availability(stream: Union[Stream, AbstractStream], logger: logging.Logger) -> Tuple[bool, Optional[str]]:
18+
"""
19+
As a transition period, we want to support both Stream and AbstractStream until we migrate everything to AbstractStream.
20+
"""
21+
if isinstance(stream, Stream):
22+
return HttpAvailabilityStrategy().check_availability(stream, logger)
23+
elif isinstance(stream, AbstractStream):
24+
availability = stream.check_availability()
25+
return availability.is_available, availability.reason
26+
else:
27+
raise ValueError(f"Unsupported stream type {type(stream)}")
28+
29+
1530
@dataclass(frozen=True)
1631
class DynamicStreamCheckConfig:
1732
"""Defines the configuration for dynamic stream during connection checking. This class specifies
@@ -51,7 +66,7 @@ def check_connection(
5166
) -> Tuple[bool, Any]:
5267
"""Checks the connection to the source and its streams."""
5368
try:
54-
streams = source.streams(config=config)
69+
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
5570
if not streams:
5671
return False, f"No streams to connect to from source {source}"
5772
except Exception as error:
@@ -82,13 +97,12 @@ def check_connection(
8297
return True, None
8398

8499
def _check_stream_availability(
85-
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
100+
self, stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], stream_name: str, logger: logging.Logger
86101
) -> Tuple[bool, Any]:
87102
"""Checks if streams are available."""
88-
availability_strategy = HttpAvailabilityStrategy()
89103
try:
90104
stream = stream_name_to_stream[stream_name]
91-
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
105+
stream_is_available, reason = evaluate_availability(stream, logger)
92106
if not stream_is_available:
93107
message = f"Stream {stream_name} is not available: {reason}"
94108
logger.warning(message)
@@ -98,7 +112,7 @@ def _check_stream_availability(
98112
return True, None
99113

100114
def _check_dynamic_streams_availability(
101-
self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
115+
self, source: AbstractSource, stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], logger: logging.Logger
102116
) -> Tuple[bool, Any]:
103117
"""Checks the availability of dynamic streams."""
104118
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
@@ -135,18 +149,15 @@ def _map_generated_streams(
135149
def _check_generated_streams_availability(
136150
self,
137151
generated_streams: List[Dict[str, Any]],
138-
stream_name_to_stream: Dict[str, Any],
152+
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
139153
logger: logging.Logger,
140154
max_count: int,
141155
) -> Tuple[bool, Any]:
142156
"""Checks availability of generated dynamic streams."""
143-
availability_strategy = HttpAvailabilityStrategy()
144157
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
145158
stream = stream_name_to_stream[declarative_stream["name"]]
146159
try:
147-
stream_is_available, reason = availability_strategy.check_availability(
148-
stream, logger
149-
)
160+
stream_is_available, reason = evaluate_availability(stream, logger)
150161
if not stream_is_available:
151162
message = f"Dynamic Stream {stream.name} is not available: {reason}"
152163
logger.warning(message)

airbyte_cdk/sources/streams/concurrent/abstract_stream.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from airbyte_cdk.models import AirbyteStream
1111
from airbyte_cdk.sources.source import ExperimentalClassWarning
12+
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
1213
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
1314
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1415

@@ -87,3 +88,9 @@ def cursor(self) -> Cursor:
8788
"""
8889
:return: The cursor associated with this stream.
8990
"""
91+
92+
@abstractmethod
93+
def check_availability(self) -> StreamAvailability:
94+
"""
95+
:return: If the stream is available and if not, why
96+
"""
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from abc import ABC, abstractmethod
6+
from typing import Optional
7+
8+
9+
class StreamAvailability:
10+
11+
@classmethod
12+
def available(cls) -> "StreamAvailability":
13+
return cls(True)
14+
15+
@classmethod
16+
def unavailable(cls, reason: str) -> "StreamAvailability":
17+
return StreamAvailability(False, reason)
18+
19+
def __init__(self, available: bool, reason: Optional[str] = None) -> None:
20+
self._available = available
21+
self._reason = reason
22+
23+
if not available:
24+
assert reason, "A reason needs to be provided if the stream is not available"
25+
26+
@property
27+
def is_available(self) -> bool:
28+
"""
29+
:return: True if the stream is available. False if the stream is not
30+
"""
31+
return self._available
32+
33+
@property
34+
def reason(self) -> Optional[str]:
35+
"""
36+
:return: A message describing why the stream is not available. If the stream is available, this should return None.
37+
"""
38+
return self._reason

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88

99
from airbyte_cdk.models import AirbyteStream, SyncMode
1010
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
11+
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
1112
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
1213
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1314
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
15+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
1416

1517

1618
class DefaultStream(AbstractStream):
19+
1720
def __init__(
1821
self,
1922
partition_generator: PartitionGenerator,
@@ -91,3 +94,29 @@ def log_stream_sync_configuration(self) -> None:
9194
@property
9295
def cursor(self) -> Cursor:
9396
return self._cursor
97+
98+
def check_availability(self) -> StreamAvailability:
99+
"""
100+
Check stream availability by attempting to read the first record of the stream.
101+
"""
102+
try:
103+
partition = next(iter(self.generate_partitions()))
104+
except StopIteration:
105+
# NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
106+
# If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
107+
# This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
108+
# without accounting for the case in which the parent stream is empty.
109+
return StreamAvailability.unavailable(
110+
f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
111+
)
112+
except AirbyteTracedException as error:
113+
return StreamAvailability.unavailable(error.message)
114+
115+
try:
116+
next(iter(partition.read()))
117+
return StreamAvailability.available()
118+
except StopIteration:
119+
self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
120+
return StreamAvailability.available()
121+
except AirbyteTracedException as error:
122+
return StreamAvailability.unavailable(error.message)

unit_tests/sources/declarative/checks/test_check_stream.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
ConcurrentDeclarativeSource,
1818
)
1919
from airbyte_cdk.sources.streams.http import HttpStream
20+
from airbyte_cdk.sources.streams.core import Stream
2021
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
2122

2223
logger = logging.getLogger("test")
@@ -45,7 +46,7 @@
4546
def test_check_stream_with_slices_as_list(
4647
test_name, record, streams_to_check, stream_slice, expectation, slices_as_list
4748
):
48-
stream = MagicMock()
49+
stream = MagicMock(spec=Stream)
4950
stream.name = "s1"
5051
stream.availability_strategy = None
5152
if slices_as_list:
@@ -77,7 +78,7 @@ def mock_read_records(responses, default_response=None, **kwargs):
7778

7879

7980
def test_check_empty_stream():
80-
stream = MagicMock()
81+
stream = MagicMock(spec=Stream)
8182
stream.name = "s1"
8283
stream.read_records.return_value = iter([])
8384
stream.stream_slices.return_value = iter([None])
@@ -91,7 +92,7 @@ def test_check_empty_stream():
9192

9293

9394
def test_check_stream_with_no_stream_slices_aborts():
94-
stream = MagicMock()
95+
stream = MagicMock(spec=Stream)
9596
stream.name = "s1"
9697
stream.stream_slices.return_value = iter([])
9798

unit_tests/sources/streams/concurrent/test_default_stream.py

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,22 @@
44
import unittest
55
from unittest.mock import Mock
66

7+
import pytest
8+
79
from airbyte_cdk.models import AirbyteStream, SyncMode
810
from airbyte_cdk.sources.message import InMemoryMessageRepository
911
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
1012
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
13+
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
14+
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
15+
from airbyte_cdk.sources.types import Record
16+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
1117

1218

1319
class ThreadBasedConcurrentStreamTest(unittest.TestCase):
1420
def setUp(self):
15-
self._partition_generator = Mock()
21+
self._partition_generator = Mock(spec=PartitionGenerator)
22+
self._partition = Mock(spec=Partition)
1623
self._name = "name"
1724
self._json_schema = {}
1825
self._primary_key = []
@@ -243,3 +250,62 @@ def test_as_airbyte_stream_with_file_transfer_support(self):
243250
actual_airbyte_stream = stream.as_airbyte_stream()
244251

245252
assert actual_airbyte_stream == expected_airbyte_stream
253+
254+
def test_given_no_partitions_when_get_availability_then_unavailable(self) -> None:
255+
self._partition_generator.generate.return_value = []
256+
257+
availability = self._stream.check_availability()
258+
259+
assert availability.is_available == False
260+
assert "no stream slices were found" in availability.reason
261+
262+
def test_given_AirbyteTracedException_when_generating_partitions_when_get_availability_then_unavailable(self) -> None:
263+
error_message = "error while generating partitions"
264+
self._partition_generator.generate.side_effect = AirbyteTracedException(message=error_message)
265+
266+
availability = self._stream.check_availability()
267+
268+
assert availability.is_available == False
269+
assert error_message in availability.reason
270+
271+
def test_given_unknown_error_when_generating_partitions_when_get_availability_then_raise(self) -> None:
272+
"""
273+
I'm not sure why we handle AirbyteTracedException but not other exceptions but this is to keep feature compatibility with HttpAvailabilityStrategy
274+
"""
275+
self._partition_generator.generate.side_effect = ValueError()
276+
with pytest.raises(ValueError):
277+
self._stream.check_availability()
278+
279+
def test_given_no_records_when_get_availability_then_available(self) -> None:
280+
self._partition_generator.generate.return_value = [self._partition]
281+
self._partition.read.return_value = []
282+
283+
availability = self._stream.check_availability()
284+
285+
assert availability.is_available == True
286+
287+
def test_given_records_when_get_availability_then_available(self) -> None:
288+
self._partition_generator.generate.return_value = [self._partition]
289+
self._partition.read.return_value = [Mock(spec=Record)]
290+
291+
availability = self._stream.check_availability()
292+
293+
assert availability.is_available == True
294+
295+
def test_given_AirbyteTracedException_when_reading_records_when_get_availability_then_unavailable(self) -> None:
296+
self._partition_generator.generate.return_value = [self._partition]
297+
error_message = "error while reading records"
298+
self._partition.read.side_effect = AirbyteTracedException(message=error_message)
299+
300+
availability = self._stream.check_availability()
301+
302+
assert availability.is_available == False
303+
304+
def test_given_unknown_error_when_reading_record_when_get_availability_then_raise(self) -> None:
305+
"""
306+
I'm not sure why we handle AirbyteTracedException but not other exceptions but this is to keep feature compatibility with HttpAvailabilityStrategy
307+
"""
308+
self._partition_generator.generate.side_effect = ValueError()
309+
self._partition.read.return_value = []
310+
with pytest.raises(ValueError):
311+
self._stream.check_availability()

0 commit comments

Comments
 (0)