Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1d5b468
remove
Jul 31, 2025
76ac6f7
Auto-fix lint and format issues
Jul 31, 2025
2d1e2f4
remove unused file
Jul 31, 2025
b4a5fec
have declarative availability check support AbstractStream
Jul 31, 2025
fc6c6b6
Auto-fix lint and format issues
Jul 31, 2025
5fe2e02
mypy
Jul 31, 2025
1e8e968
Auto-fix lint and format issues
Jul 31, 2025
689e792
Remove RFR stuff
Aug 1, 2025
5399436
have bland stream be instantiated as DefaultStream
Aug 4, 2025
dff2559
fix test
Aug 4, 2025
7dc2164
fix test, format, lint and a bit of mypy
Aug 4, 2025
0bfbdfe
mypy
Aug 4, 2025
0b454bb
format
Aug 4, 2025
13c17f4
remove unused line
Aug 4, 2025
0f36dc5
Merge branch 'main' into maxi297/remove-availability-strategy-except-…
maxi297 Aug 4, 2025
6f95ebb
Merge branch 'maxi297/remove-availability-strategy-except-for-filebas…
maxi297 Aug 4, 2025
c94892a
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
Aug 4, 2025
fb75765
fix test
Aug 4, 2025
c078395
lint
Aug 4, 2025
decc557
format
Aug 4, 2025
b8daf64
code review
Aug 4, 2025
e8edc4b
Merge branch 'main' into maxi297/availability_strategy_to_support_abs…
maxi297 Aug 5, 2025
2bc4b30
code review
Aug 5, 2025
98e2227
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
Aug 5, 2025
1079629
supports_file_transfer
Aug 6, 2025
7f643e4
format
Aug 6, 2025
96f15c3
Merge branch 'main' into maxi297/bland_stream_instantiated_as_default…
Aug 11, 2025
11e3a35
format
Aug 11, 2025
ebb4b28
more fixes for DefaultStream in Connector Builder
Aug 11, 2025
6fef39b
mypy and format
Aug 11, 2025
e31fed9
format broke mypy
Aug 11, 2025
f290bdf
Merge branch 'main' into maxi297/bland_stream_instantiated_as_default…
Aug 19, 2025
f73d79f
fix incomplete merge
Aug 19, 2025
c3f81d0
format
Aug 19, 2025
dc61e45
lint
Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ def run_test_read(
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()

schema_inferrer = SchemaInferrer(
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
self._pk_to_nested_and_composite_field(
stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key
)
if stream
else None, # type: ignore # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
if stream
else None,
Expand Down
33 changes: 21 additions & 12 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,18 @@
import logging
from dataclasses import dataclass, field
from queue import Queue
from typing import Any, ClassVar, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
from typing import (
Any,
ClassVar,
Generic,
Iterator,
List,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)

from airbyte_protocol_dataclasses.models import Level

Expand All @@ -19,10 +30,6 @@
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.extractors import RecordSelector
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.incremental import (
ConcurrentPerPartitionCursor,
GlobalSubstreamCursor,
Expand All @@ -32,7 +39,6 @@
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.models import FileUploader
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConcurrencyLevel as ConcurrencyLevelModel,
)
Expand Down Expand Up @@ -117,7 +123,6 @@ def __init__(
# incremental streams running in full refresh.
component_factory = ModelToComponentFactory(
emit_connector_builder_messages=emit_connector_builder_messages,
disable_resumable_full_refresh=True,
message_repository=ConcurrentMessageRepository(queue, message_repository),
connector_state_manager=self._connector_state_manager,
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
Expand Down Expand Up @@ -223,7 +228,7 @@ def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> Airbyte
]
)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
"""
The `streams` method is used as part of the AbstractSource in the following cases:
* ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
Expand Down Expand Up @@ -253,6 +258,10 @@ def _group_streams(
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
# so we need to treat them as synchronous

if isinstance(declarative_stream, AbstractStream):
concurrent_streams.append(declarative_stream)
continue

supports_file_transfer = (
isinstance(declarative_stream, DeclarativeStream)
and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
Expand Down Expand Up @@ -322,7 +331,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
partition_factory=DeclarativePartitionFactory(
stream_name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
retriever=retriever,
message_repository=self.message_repository,
max_records_limit=self._limits.max_records
Expand Down Expand Up @@ -359,7 +368,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
partition_factory=DeclarativePartitionFactory(
stream_name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
retriever=retriever,
message_repository=self.message_repository,
max_records_limit=self._limits.max_records
Expand Down Expand Up @@ -393,7 +402,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
stream_name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
retriever=declarative_stream.retriever,
message_repository=self.message_repository,
max_records_limit=self._limits.max_records if self._limits else None,
Expand Down Expand Up @@ -457,7 +466,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
stream_name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
retriever=retriever,
message_repository=self.message_repository,
max_records_limit=self._limits.max_records if self._limits else None,
Expand Down
10 changes: 8 additions & 2 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from copy import deepcopy
from importlib import metadata
from types import ModuleType
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union

import orjson
import yaml
Expand Down Expand Up @@ -66,6 +66,7 @@
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.spec.spec import Spec
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.types import Config, ConnectionDefinition
from airbyte_cdk.sources.utils.slice_logger import (
Expand Down Expand Up @@ -297,7 +298,12 @@ def connection_checker(self) -> ConnectionChecker:
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
"""
As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
fully decouple this from the AbstractSource.
"""
if self._spec_component:
self._spec_component.validate_config(config)

Expand Down
Loading
Loading