Skip to content

Commit ebb4b28

Browse files
author
maxime.c
committed
more fixes for DefaultStream in Connector Builder
1 parent 11e3a35 commit ebb4b28

File tree

3 files changed

+24
-9
lines changed

3 files changed

+24
-9
lines changed

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def run_test_read(
120120
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
121121

122122
schema_inferrer = SchemaInferrer(
123-
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
123+
self._pk_to_nested_and_composite_field(stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key) if stream else None,
124124
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
125125
if stream
126126
else None,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2072,7 +2072,6 @@ def create_declarative_stream(
20722072
if (
20732073
isinstance(combined_slicers, PartitionRouter)
20742074
and not is_parent
2075-
and not self._emit_connector_builder_messages
20762075
):
20772076
# We are starting to migrate streams to instantiate directly the DefaultStream instead of instantiating the
20782077
# DeclarativeStream and assembling the DefaultStream from that. The plan is the following:
@@ -2089,7 +2088,13 @@ def create_declarative_stream(
20892088
retriever,
20902089
self._message_repository,
20912090
),
2092-
combined_slicers,
2091+
stream_slicer=cast(
2092+
StreamSlicer,
2093+
StreamSlicerTestReadDecorator(
2094+
wrapped_slicer=combined_slicers,
2095+
maximum_number_of_slices=self._limit_slices_fetched or 5,
2096+
),
2097+
),
20932098
)
20942099
return DefaultStream(
20952100
partition_generator=partition_generator,

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import json
88
import logging
99
import os
10-
from typing import List, Literal
10+
from typing import List, Literal, Union
1111
from unittest import mock
1212
from unittest.mock import MagicMock, patch
1313

@@ -17,7 +17,6 @@
1717

1818
from airbyte_cdk import connector_builder
1919
from airbyte_cdk.connector_builder.connector_builder_handler import (
20-
TestLimits,
2120
create_source,
2221
get_limits,
2322
resolve_manifest,
@@ -60,6 +59,7 @@
6059
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
6160
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
6261
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
62+
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
6363
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
6464
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets, update_secrets
6565
from unit_tests.connector_builder.utils import create_configured_catalog
@@ -440,6 +440,14 @@
440440
}
441441

442442

443+
def get_retriever(stream: Union[DeclarativeStream, DefaultStream]):
444+
return (
445+
stream.retriever
446+
if isinstance(stream, DeclarativeStream)
447+
else stream._stream_partition_generator._partition_factory._retriever
448+
)
449+
450+
443451
@pytest.fixture
444452
def valid_resolve_manifest_config_file(tmp_path):
445453
config_file = tmp_path / "config.json"
@@ -1130,8 +1138,9 @@ def test_read_source(mock_http_stream):
11301138

11311139
streams = source.streams(config)
11321140
for s in streams:
1133-
assert isinstance(s.retriever, SimpleRetriever)
1134-
assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator)
1141+
retriever = get_retriever(s)
1142+
assert isinstance(retriever, SimpleRetriever)
1143+
assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator)
11351144

11361145

11371146
@patch.object(
@@ -1177,8 +1186,9 @@ def test_read_source_single_page_single_slice(mock_http_stream):
11771186

11781187
streams = source.streams(config)
11791188
for s in streams:
1180-
assert isinstance(s.retriever, SimpleRetriever)
1181-
assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator)
1189+
retriever = get_retriever(s)
1190+
assert isinstance(retriever, SimpleRetriever)
1191+
assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator)
11821192

11831193

11841194
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)