Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1d5b468
remove
Jul 31, 2025
76ac6f7
Auto-fix lint and format issues
Jul 31, 2025
2d1e2f4
remove unused file
Jul 31, 2025
b4a5fec
have declarative availability check support AbstractStream
Jul 31, 2025
fc6c6b6
Auto-fix lint and format issues
Jul 31, 2025
5fe2e02
mypy
Jul 31, 2025
1e8e968
Auto-fix lint and format issues
Jul 31, 2025
689e792
Remove RFR stuff
Aug 1, 2025
5399436
have bland stream be instantiated as DefaultStream
Aug 4, 2025
dff2559
fix test
Aug 4, 2025
7dc2164
fix test, format, lint and a bit of mypy
Aug 4, 2025
0bfbdfe
mypy
Aug 4, 2025
0b454bb
format
Aug 4, 2025
13c17f4
remove unused line
Aug 4, 2025
0f36dc5
Merge branch 'main' into maxi297/remove-availability-strategy-except-…
maxi297 Aug 4, 2025
6f95ebb
Merge branch 'maxi297/remove-availability-strategy-except-for-filebas…
maxi297 Aug 4, 2025
c94892a
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
Aug 4, 2025
fb75765
fix test
Aug 4, 2025
c078395
lint
Aug 4, 2025
decc557
format
Aug 4, 2025
b8daf64
code review
Aug 4, 2025
e8edc4b
Merge branch 'main' into maxi297/availability_strategy_to_support_abs…
maxi297 Aug 5, 2025
2bc4b30
code review
Aug 5, 2025
98e2227
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
Aug 5, 2025
1079629
supports_file_transfer
Aug 6, 2025
7f643e4
format
Aug 6, 2025
96f15c3
Merge branch 'main' into maxi297/bland_stream_instantiated_as_default…
Aug 11, 2025
11e3a35
format
Aug 11, 2025
ebb4b28
more fixes for DefaultStream in Connector Builder
Aug 11, 2025
6fef39b
mypy and format
Aug 11, 2025
e31fed9
format broke mypy
Aug 11, 2025
f290bdf
Merge branch 'main' into maxi297/bland_stream_instantiated_as_default…
Aug 19, 2025
f73d79f
fix incomplete merge
Aug 19, 2025
c3f81d0
format
Aug 19, 2025
dc61e45
lint
Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
#

import logging
import traceback
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Tuple
from typing import Any, List, Mapping, Tuple, Union

from airbyte_cdk import AbstractSource
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.check_stream import evaluate_availability
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


Expand All @@ -34,20 +35,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
def check_connection(
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Any]:
streams = source.streams(config=config)
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker

if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
if not self.use_check_availability:
return True, None

availability_strategy = HttpAvailabilityStrategy()

try:
for stream in streams[: min(self.stream_count, len(streams))]:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
stream_is_available, reason = evaluate_availability(stream, logger)
if not stream_is_available:
logger.warning(f"Stream {stream.name} is not available: {reason}")
return False, reason
Expand Down
43 changes: 31 additions & 12 deletions airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,30 @@
import logging
import traceback
from dataclasses import InitVar, dataclass
from typing import Any, Dict, List, Mapping, Optional, Tuple
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union

from airbyte_cdk import AbstractSource
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


def evaluate_availability(
stream: Union[Stream, AbstractStream], logger: logging.Logger
) -> Tuple[bool, Optional[str]]:
"""
As a transition period, we want to support both Stream and AbstractStream until we migrate everything to AbstractStream.
"""
if isinstance(stream, Stream):
return HttpAvailabilityStrategy().check_availability(stream, logger)
elif isinstance(stream, AbstractStream):
availability = stream.check_availability()
return availability.is_available, availability.reason
else:
raise ValueError(f"Unsupported stream type {type(stream)}")


@dataclass(frozen=True)
class DynamicStreamCheckConfig:
"""Defines the configuration for dynamic stream during connection checking. This class specifies
Expand Down Expand Up @@ -51,7 +68,7 @@ def check_connection(
) -> Tuple[bool, Any]:
"""Checks the connection to the source and its streams."""
try:
streams = source.streams(config=config)
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
if not streams:
return False, f"No streams to connect to from source {source}"
except Exception as error:
Expand Down Expand Up @@ -82,13 +99,15 @@ def check_connection(
return True, None

def _check_stream_availability(
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
self,
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
stream_name: str,
logger: logging.Logger,
) -> Tuple[bool, Any]:
"""Checks if streams are available."""
availability_strategy = HttpAvailabilityStrategy()
try:
stream = stream_name_to_stream[stream_name]
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
stream_is_available, reason = evaluate_availability(stream, logger)
if not stream_is_available:
message = f"Stream {stream_name} is not available: {reason}"
logger.warning(message)
Expand All @@ -98,7 +117,10 @@ def _check_stream_availability(
return True, None

def _check_dynamic_streams_availability(
self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
self,
source: AbstractSource,
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
logger: logging.Logger,
) -> Tuple[bool, Any]:
"""Checks the availability of dynamic streams."""
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
Expand Down Expand Up @@ -135,18 +157,15 @@ def _map_generated_streams(
def _check_generated_streams_availability(
self,
generated_streams: List[Dict[str, Any]],
stream_name_to_stream: Dict[str, Any],
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
logger: logging.Logger,
max_count: int,
) -> Tuple[bool, Any]:
"""Checks availability of generated dynamic streams."""
availability_strategy = HttpAvailabilityStrategy()
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
stream = stream_name_to_stream[declarative_stream["name"]]
try:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
stream_is_available, reason = evaluate_availability(stream, logger)
if not stream_is_available:
message = f"Dynamic Stream {stream.name} is not available: {reason}"
logger.warning(message)
Expand Down
38 changes: 20 additions & 18 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@
#

import logging
from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
from typing import (
Any,
Generic,
Iterator,
List,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)

from airbyte_cdk.models import (
AirbyteCatalog,
Expand All @@ -15,10 +25,6 @@
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.extractors import RecordSelector
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.incremental import (
ConcurrentPerPartitionCursor,
GlobalSubstreamCursor,
Expand All @@ -28,7 +34,6 @@
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.models import FileUploader
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConcurrencyLevel as ConcurrencyLevelModel,
)
Expand All @@ -52,9 +57,6 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AlwaysAvailableAvailabilityStrategy,
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
Expand Down Expand Up @@ -87,7 +89,6 @@ def __init__(
# incremental streams running in full refresh.
component_factory = component_factory or ModelToComponentFactory(
emit_connector_builder_messages=emit_connector_builder_messages,
disable_resumable_full_refresh=True,
connector_state_manager=self._connector_state_manager,
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
)
Expand Down Expand Up @@ -183,7 +184,7 @@ def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> Airbyte
]
)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
"""
The `streams` method is used as part of the AbstractSource in the following cases:
* ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
Expand Down Expand Up @@ -213,6 +214,10 @@ def _group_streams(
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
# so we need to treat them as synchronous

if isinstance(declarative_stream, AbstractStream):
concurrent_streams.append(declarative_stream)
continue

supports_file_transfer = (
isinstance(declarative_stream, DeclarativeStream)
and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
Expand Down Expand Up @@ -282,7 +287,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
partition_factory=DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
declarative_stream._schema_loader, # type: ignore # I know it's private property but the public one is optional and we will remove this code soonish
retriever,
self.message_repository,
),
Expand Down Expand Up @@ -313,7 +318,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
partition_factory=DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
declarative_stream._schema_loader, # type: ignore # I know it's private property but the public one is optional and we will remove this code soonish
retriever,
self.message_repository,
),
Expand All @@ -325,7 +330,6 @@ def _group_streams(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=cursor.cursor_field.cursor_field_key
if hasattr(cursor, "cursor_field")
Expand All @@ -344,7 +348,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
declarative_stream._schema_loader, # type: ignore # I know it's private property but the public one is optional and we will remove this code soonish
declarative_stream.retriever,
self.message_repository,
),
Expand All @@ -362,7 +366,6 @@ def _group_streams(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=None,
logger=self.logger,
Expand Down Expand Up @@ -405,7 +408,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
declarative_stream._schema_loader, # type: ignore # I know it's private property but the public one is optional and we will remove this code soonish
retriever,
self.message_repository,
),
Expand All @@ -417,7 +420,6 @@ def _group_streams(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
logger=self.logger,
Expand Down
10 changes: 8 additions & 2 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from copy import deepcopy
from importlib import metadata
from types import ModuleType
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union

import orjson
import yaml
Expand Down Expand Up @@ -66,6 +66,7 @@
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.spec.spec import Spec
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.types import Config, ConnectionDefinition
from airbyte_cdk.sources.utils.slice_logger import (
Expand Down Expand Up @@ -297,7 +298,12 @@ def connection_checker(self) -> ConnectionChecker:
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
"""
As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
fully decouple this from the AbstractSource.
"""
if self._spec_component:
self._spec_component.validate_config(config)

Expand Down
Loading
Loading