diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 153a5d105..f521186d0 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -850,6 +850,10 @@ definitions: examples: - "created_at" - "{{ config['record_cursor'] }}" + allow_catalog_defined_cursor_field: + title: Allow Catalog Defined Cursor Field + description: Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog. + type: boolean start_value: title: Start Value description: The value that determines the earliest record that should be synced. @@ -913,6 +917,10 @@ definitions: examples: - "created_at" - "{{ config['record_cursor'] }}" + allow_catalog_defined_cursor_field: + title: Allow Catalog Defined Cursor Field + description: Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog. + type: boolean cursor_datetime_formats: title: Cursor Datetime Formats type: array diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0f5c0f1f9..b78a07021 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -1584,6 +1586,11 @@ class IncrementingCountCursor(BaseModel): examples=["created_at", "{{ config['record_cursor'] }}"], title="Cursor Field", ) + allow_catalog_defined_cursor_field: Optional[bool] = Field( + None, + description="Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.", + title="Allow Catalog Defined Cursor Field", + ) start_value: Optional[Union[str, int]] = Field( None, description="The value that determines the earliest record that should be synced.", @@ -1611,6 +1618,11 @@ class DatetimeBasedCursor(BaseModel): examples=["created_at", "{{ config['record_cursor'] }}"], title="Cursor Field", ) + allow_catalog_defined_cursor_field: Optional[bool] = Field( + None, + description="Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.", + title="Allow Catalog Defined Cursor Field", + ) cursor_datetime_formats: Optional[List[str]] = Field( None, description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the Outgoing Datetime Format will be used.\nUse placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%s_as_float**: Epoch unix timestamp in seconds as float with microsecond precision - `1686218963.123456`\n * **%ms**: Epoch unix timestamp - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`, `000001`, ..., `999999`\n * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`, `001`, ..., `999`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (Sunday as first day) - `00`, `01`, ..., `53`\n * **%W**: Week number of the year (Monday as first day) - `00`, `01`, ..., `53`\n * **%c**: Date and time representation - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date representation - `08/16/1988`\n * **%X**: Time representation - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 867c93a22..3a772b691 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1326,11 +1326,23 @@ def create_concurrent_cursor_from_datetime_based_cursor( ) model_parameters = datetime_based_cursor_model.parameters or {} - interpolated_cursor_field = InterpolatedString.create( - datetime_based_cursor_model.cursor_field, - parameters=model_parameters, + + cursor_field = self._get_catalog_defined_cursor_field( + stream_name=stream_name, + allow_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field + or False, ) - cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) + + if not cursor_field: + interpolated_cursor_field = InterpolatedString.create( + datetime_based_cursor_model.cursor_field, + parameters=model_parameters, + ) + cursor_field = CursorField( + cursor_field_key=interpolated_cursor_field.eval(config=config), + supports_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field + or False, + ) interpolated_partition_field_start = InterpolatedString.create( datetime_based_cursor_model.partition_field_start or "start_time", @@ -1551,11 +1563,22 @@ def create_concurrent_cursor_from_incrementing_count_cursor( else 0 ) - interpolated_cursor_field = InterpolatedString.create( - incrementing_count_cursor_model.cursor_field, - parameters=incrementing_count_cursor_model.parameters or {}, + cursor_field = self._get_catalog_defined_cursor_field( + stream_name=stream_name, + allow_catalog_defined_cursor_field=incrementing_count_cursor_model.allow_catalog_defined_cursor_field + or False, ) - cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) + + if not cursor_field: + interpolated_cursor_field = InterpolatedString.create( + incrementing_count_cursor_model.cursor_field, + parameters=incrementing_count_cursor_model.parameters or {}, + ) + cursor_field = CursorField( + cursor_field_key=interpolated_cursor_field.eval(config=config), + supports_catalog_defined_cursor_field=incrementing_count_cursor_model.allow_catalog_defined_cursor_field + or False, + ) connector_state_converter = IncrementingCountStreamStateConverter( is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state @@ -1625,15 +1648,26 @@ def create_concurrent_cursor_from_perpartition_cursor( f"Expected {model_type.__name__} component, but received {datetime_based_cursor_model.__class__.__name__}" ) - interpolated_cursor_field = InterpolatedString.create( - datetime_based_cursor_model.cursor_field, - # FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases: - # * The ComponentDefinition comes from model.__dict__ in which case we have `parameters` - # * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters` - # We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory. - parameters=datetime_based_cursor_model.parameters or {}, - ) - cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) + cursor_field = self._get_catalog_defined_cursor_field( + stream_name=stream_name, + allow_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field + or False, + ) + + if not cursor_field: + interpolated_cursor_field = InterpolatedString.create( + datetime_based_cursor_model.cursor_field, + # FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases: + # * The ComponentDefinition comes from model.__dict__ in which case we have `parameters` + # * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters` + # We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory. + parameters=datetime_based_cursor_model.parameters or {}, + ) + cursor_field = CursorField( + cursor_field_key=interpolated_cursor_field.eval(config=config), + supports_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field + or False, + ) datetime_format = datetime_based_cursor_model.datetime_format @@ -2076,9 +2110,11 @@ def create_default_stream( name=stream_name, json_schema=schema_loader.get_json_schema, primary_key=get_primary_key_from_stream(primary_key), - cursor_field=concurrent_cursor.cursor_field.cursor_field_key + cursor_field=concurrent_cursor.cursor_field if hasattr(concurrent_cursor, "cursor_field") - else "", # FIXME we should have the cursor field has part of the interface of cursor, + else CursorField( + cursor_field_key="" + ), # FIXME we should have the cursor field has part of the interface of cursor, logger=logging.getLogger(f"airbyte.{stream_name}"), cursor=concurrent_cursor, supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader), @@ -4293,3 +4329,25 @@ def _ensure_query_properties_to_model( request_parameters[request_parameter_key] = QueryPropertiesModel.parse_obj( request_parameter ) + + def _get_catalog_defined_cursor_field( + self, stream_name: str, allow_catalog_defined_cursor_field: bool + ) -> Optional[CursorField]: + if not allow_catalog_defined_cursor_field: + return None + + configured_stream = self._stream_name_to_configured_stream.get(stream_name) + + # Depending on the operation is being performed, there may not be a configured stream yet. In this + # case we return None which will then use the default cursor field defined on the cursor model + if not configured_stream or not configured_stream.cursor_field: + return None + elif len(configured_stream.cursor_field) > 1: + raise ValueError( + f"The `{stream_name}` stream does not support nested cursor_field. Please specify only a single cursor_field for the stream in the configured catalog." + ) + else: + return CursorField( + cursor_field_key=configured_stream.cursor_field[0], + supports_catalog_defined_cursor_field=allow_catalog_defined_cursor_field, + ) diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index fd8eef9b0..7f944bf85 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -32,6 +32,7 @@ from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade +from airbyte_cdk.sources.streams.concurrent.cursor import CursorField from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage from airbyte_cdk.sources.streams.concurrent.helpers import ( @@ -97,7 +98,7 @@ def create_from_stream( name=stream.name, json_schema=stream.get_json_schema(), primary_key=pk, - cursor_field=cursor_field, + cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, logger=logger, namespace=stream.namespace, cursor=cursor, diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index c1dea49de..41674bdae 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -25,7 +25,7 @@ from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade -from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage from airbyte_cdk.sources.streams.concurrent.helpers import ( @@ -97,7 +97,7 @@ def create_from_stream( namespace=stream.namespace, json_schema=stream.get_json_schema(), primary_key=pk, - cursor_field=cursor_field, + cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, logger=logger, cursor=cursor, ), diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index 5210164dd..e3a487183 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -38,8 +38,11 @@ def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any: class CursorField: - def __init__(self, cursor_field_key: str) -> None: + def __init__( + self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False + ) -> None: self.cursor_field_key = cursor_field_key + self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field def extract_value(self, record: Record) -> Any: cursor_value = record.data.get(self.cursor_field_key) diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index ca227fd50..f5d4ccf2e 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -8,7 +8,7 @@ from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability -from airbyte_cdk.sources.streams.concurrent.cursor import Cursor +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -21,7 +21,7 @@ def __init__( name: str, json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], primary_key: List[str], - cursor_field: Optional[str], + cursor_field: Optional[CursorField], logger: Logger, cursor: Cursor, namespace: Optional[str] = None, @@ -50,7 +50,7 @@ def namespace(self) -> Optional[str]: @property def cursor_field(self) -> Optional[str]: - return self._cursor_field + return self._cursor_field.cursor_field_key if self._cursor_field else None def get_json_schema(self) -> Mapping[str, Any]: return self._json_schema() if callable(self._json_schema) else self._json_schema @@ -68,10 +68,12 @@ def as_airbyte_stream(self) -> AirbyteStream: stream.namespace = self._namespace if self._cursor_field: - stream.source_defined_cursor = True + stream.source_defined_cursor = ( + not self._cursor_field.supports_catalog_defined_cursor_field + ) stream.is_resumable = True stream.supported_sync_modes.append(SyncMode.incremental) - stream.default_cursor_field = [self._cursor_field] + stream.default_cursor_field = [self._cursor_field.cursor_field_key] keys = self._primary_key if keys and len(keys) > 0: diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 193196217..dcdc2bcff 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -5030,6 +5030,190 @@ def test_create_stream_with_multiple_schema_loaders(): assert isinstance(schema_loader.schema_loaders[1], InlineSchemaLoader) +@pytest.mark.parametrize( + "allow_catalog_defined_cursor_field,catalog_cursor_field,expected_cursor_field", + [ + pytest.param( + True, + "custom_cursor_field", + "custom_cursor_field", + id="test_catalog_defined_cursor_field", + ), + pytest.param( + True, + None, + "updated_at", + id="test_no_catalog_cursor_field_defaults_to_stream_defined_cursor_field", + ), + pytest.param( + False, + "custom_cursor_field", + "updated_at", + id="test_allow_catalog_defined_cursor_field_false_defaults_to_stream_defined_cursor_field", + ), + ], +) +def test_catalog_defined_cursor_field( + allow_catalog_defined_cursor_field, catalog_cursor_field, expected_cursor_field +): + content = """ +selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] +requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.sendgrid.com/v3/" + http_method: "GET" +list_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + cursor_field: "updated_at" + allow_catalog_defined_cursor_field: true + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + paginator: + type: DefaultPaginator + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + page_size: 10 + requester: + $ref: "#/requester" + path: "/" + record_selector: + $ref: "#/selector" + $parameters: + name: "lists" + """ + + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="lists", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=[SyncMode.incremental, SyncMode.full_refresh], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.overwrite, + cursor_field=[catalog_cursor_field] if catalog_cursor_field else None, + ) + ] + ) + + model_to_component_factory = ModelToComponentFactory( + configured_catalog=configured_catalog, + ) + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["list_stream"], {} + ) + + stream_manifest["incremental_sync"]["allow_catalog_defined_cursor_field"] = ( + allow_catalog_defined_cursor_field + ) + + stream: DefaultStream = model_to_component_factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config + ) + + assert stream.cursor_field == expected_cursor_field + assert stream._cursor_field.cursor_field_key == expected_cursor_field + assert ( + stream._cursor_field.supports_catalog_defined_cursor_field + == allow_catalog_defined_cursor_field + ) + + +def test_catalog_defined_cursor_field_stream_missing(): + content = """ +selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] +requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.sendgrid.com/v3/" + http_method: "GET" +list_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + cursor_field: "updated_at" + allow_catalog_defined_cursor_field: true + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + paginator: + type: DefaultPaginator + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + page_size: 10 + requester: + $ref: "#/requester" + path: "/" + record_selector: + $ref: "#/selector" + $parameters: + name: "lists" + """ + + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="other_stream", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=[SyncMode.incremental, SyncMode.full_refresh], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.overwrite, + cursor_field=["custom_cursor_field"], + ) + ] + ) + + model_to_component_factory = ModelToComponentFactory( + configured_catalog=configured_catalog, + ) + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["list_stream"], {} + ) + + stream: DefaultStream = model_to_component_factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config + ) + + assert stream.cursor_field == "updated_at" + assert stream._cursor_field.cursor_field_key == "updated_at" + assert stream._cursor_field.supports_catalog_defined_cursor_field == True + + def get_schema_loader(stream: DefaultStream): assert isinstance( stream._stream_partition_generator._partition_factory._schema_loader, diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index fb3428afb..e29492816 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -8,7 +8,7 @@ from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.message import InMemoryMessageRepository -from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator @@ -187,7 +187,7 @@ def test_as_airbyte_stream_with_a_cursor(self): self._name, json_schema, self._primary_key, - "date", + CursorField(cursor_field_key="date"), self._logger, FinalStateCursor( stream_name=self._name, @@ -272,6 +272,43 @@ def test_as_airbyte_stream_with_file_transfer_support(self): assert actual_airbyte_stream == expected_airbyte_stream + def test_as_airbyte_stream_with_a_catalog_defined_cursor(self): + json_schema = { + "type": "object", + "properties": { + "id": {"type": ["null", "string"]}, + "date": {"type": ["null", "string"]}, + }, + } + stream = DefaultStream( + self._partition_generator, + self._name, + json_schema, + self._primary_key, + CursorField(cursor_field_key="date", supports_catalog_defined_cursor_field=True), + self._logger, + FinalStateCursor( + stream_name=self._name, + stream_namespace=None, + message_repository=self._message_repository, + ), + ) + + expected_airbyte_stream = AirbyteStream( + name=self._name, + json_schema=json_schema, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + source_defined_cursor=False, + default_cursor_field=["date"], + source_defined_primary_key=None, + namespace=None, + is_resumable=True, + is_file_based=False, + ) + + airbyte_stream = stream.as_airbyte_stream() + assert airbyte_stream == expected_airbyte_stream + def test_given_no_partitions_when_get_availability_then_unavailable(self) -> None: self._partition_generator.generate.return_value = []