Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,10 @@ definitions:
examples:
- "created_at"
- "{{ config['record_cursor'] }}"
allow_catalog_defined_cursor_field:
title: Allow Catalog Defined Cursor Field
description: Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.
type: boolean
start_value:
title: Start Value
description: The value that determines the earliest record that should be synced.
Expand Down Expand Up @@ -913,6 +917,10 @@ definitions:
examples:
- "created_at"
- "{{ config['record_cursor'] }}"
allow_catalog_defined_cursor_field:
title: Allow Catalog Defined Cursor Field
description: Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.
type: boolean
cursor_datetime_formats:
title: Cursor Datetime Formats
type: array
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1584,6 +1586,11 @@ class IncrementingCountCursor(BaseModel):
examples=["created_at", "{{ config['record_cursor'] }}"],
title="Cursor Field",
)
allow_catalog_defined_cursor_field: Optional[bool] = Field(
None,
description="Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.",
title="Allow Catalog Defined Cursor Field",
)
start_value: Optional[Union[str, int]] = Field(
None,
description="The value that determines the earliest record that should be synced.",
Expand Down Expand Up @@ -1611,6 +1618,11 @@ class DatetimeBasedCursor(BaseModel):
examples=["created_at", "{{ config['record_cursor'] }}"],
title="Cursor Field",
)
allow_catalog_defined_cursor_field: Optional[bool] = Field(
None,
description="Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.",
title="Allow Catalog Defined Cursor Field",
)
cursor_datetime_formats: Optional[List[str]] = Field(
None,
description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the Outgoing Datetime Format will be used.\nUse placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%s_as_float**: Epoch unix timestamp in seconds as float with microsecond precision - `1686218963.123456`\n * **%ms**: Epoch unix timestamp - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`, `000001`, ..., `999999`\n * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`, `001`, ..., `999`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (Sunday as first day) - `00`, `01`, ..., `53`\n * **%W**: Week number of the year (Monday as first day) - `00`, `01`, ..., `53`\n * **%c**: Date and time representation - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date representation - `08/16/1988`\n * **%X**: Time representation - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,11 +1326,23 @@ def create_concurrent_cursor_from_datetime_based_cursor(
)

model_parameters = datetime_based_cursor_model.parameters or {}
interpolated_cursor_field = InterpolatedString.create(
datetime_based_cursor_model.cursor_field,
parameters=model_parameters,

cursor_field = self._get_catalog_defined_cursor_field(
stream_name=stream_name,
allow_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field
or False,
)
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))

if not cursor_field:
interpolated_cursor_field = InterpolatedString.create(
datetime_based_cursor_model.cursor_field,
parameters=model_parameters,
)
cursor_field = CursorField(
cursor_field_key=interpolated_cursor_field.eval(config=config),
supports_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field
or False,
)

interpolated_partition_field_start = InterpolatedString.create(
datetime_based_cursor_model.partition_field_start or "start_time",
Expand Down Expand Up @@ -1551,11 +1563,22 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
else 0
)

interpolated_cursor_field = InterpolatedString.create(
incrementing_count_cursor_model.cursor_field,
parameters=incrementing_count_cursor_model.parameters or {},
cursor_field = self._get_catalog_defined_cursor_field(
stream_name=stream_name,
allow_catalog_defined_cursor_field=incrementing_count_cursor_model.allow_catalog_defined_cursor_field
or False,
)
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))

if not cursor_field:
interpolated_cursor_field = InterpolatedString.create(
incrementing_count_cursor_model.cursor_field,
parameters=incrementing_count_cursor_model.parameters or {},
)
cursor_field = CursorField(
cursor_field_key=interpolated_cursor_field.eval(config=config),
supports_catalog_defined_cursor_field=incrementing_count_cursor_model.allow_catalog_defined_cursor_field
or False,
)

connector_state_converter = IncrementingCountStreamStateConverter(
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
Expand Down Expand Up @@ -1625,15 +1648,26 @@ def create_concurrent_cursor_from_perpartition_cursor(
f"Expected {model_type.__name__} component, but received {datetime_based_cursor_model.__class__.__name__}"
)

interpolated_cursor_field = InterpolatedString.create(
datetime_based_cursor_model.cursor_field,
# FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases:
# * The ComponentDefinition comes from model.__dict__ in which case we have `parameters`
# * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters`
# We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory.
parameters=datetime_based_cursor_model.parameters or {},
)
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))
cursor_field = self._get_catalog_defined_cursor_field(
stream_name=stream_name,
allow_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field
or False,
)

if not cursor_field:
interpolated_cursor_field = InterpolatedString.create(
datetime_based_cursor_model.cursor_field,
# FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases:
# * The ComponentDefinition comes from model.__dict__ in which case we have `parameters`
# * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters`
# We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory.
parameters=datetime_based_cursor_model.parameters or {},
)
cursor_field = CursorField(
cursor_field_key=interpolated_cursor_field.eval(config=config),
supports_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field
or False,
)

datetime_format = datetime_based_cursor_model.datetime_format

Expand Down Expand Up @@ -2076,9 +2110,11 @@ def create_default_stream(
name=stream_name,
json_schema=schema_loader.get_json_schema,
primary_key=get_primary_key_from_stream(primary_key),
cursor_field=concurrent_cursor.cursor_field.cursor_field_key
cursor_field=concurrent_cursor.cursor_field
if hasattr(concurrent_cursor, "cursor_field")
else "", # FIXME we should have the cursor field has part of the interface of cursor,
else CursorField(
cursor_field_key=""
), # FIXME we should have the cursor field has part of the interface of cursor,
logger=logging.getLogger(f"airbyte.{stream_name}"),
cursor=concurrent_cursor,
supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader),
Expand Down Expand Up @@ -4293,3 +4329,25 @@ def _ensure_query_properties_to_model(
request_parameters[request_parameter_key] = QueryPropertiesModel.parse_obj(
request_parameter
)

def _get_catalog_defined_cursor_field(
self, stream_name: str, allow_catalog_defined_cursor_field: bool
) -> Optional[CursorField]:
if not allow_catalog_defined_cursor_field:
return None

configured_stream = self._stream_name_to_configured_stream.get(stream_name)

# Depending on the operation is being performed, there may not be a configured stream yet. In this
# case we return None which will then use the default cursor field defined on the cursor model
if not configured_stream or not configured_stream.cursor_field:
return None
elif len(configured_stream.cursor_field) > 1:
raise ValueError(
f"The `{stream_name}` stream does not support nested cursor_field. Please specify only a single cursor_field for the stream in the configured catalog."
)
else:
return CursorField(
cursor_field_key=configured_stream.cursor_field[0],
supports_catalog_defined_cursor_field=allow_catalog_defined_cursor_field,
)
3 changes: 2 additions & 1 deletion airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
from airbyte_cdk.sources.streams.concurrent.helpers import (
Expand Down Expand Up @@ -97,7 +98,7 @@ def create_from_stream(
name=stream.name,
json_schema=stream.get_json_schema(),
primary_key=pk,
cursor_field=cursor_field,
cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
logger=logger,
namespace=stream.namespace,
cursor=cursor,
Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/sources/streams/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField, FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
from airbyte_cdk.sources.streams.concurrent.helpers import (
Expand Down Expand Up @@ -97,7 +97,7 @@ def create_from_stream(
namespace=stream.namespace,
json_schema=stream.get_json_schema(),
primary_key=pk,
cursor_field=cursor_field,
cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
logger=logger,
cursor=cursor,
),
Expand Down
5 changes: 4 additions & 1 deletion airbyte_cdk/sources/streams/concurrent/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any:


class CursorField:
def __init__(self, cursor_field_key: str) -> None:
def __init__(
self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False
) -> None:
self.cursor_field_key = cursor_field_key
self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field

def extract_value(self, record: Record) -> Any:
cursor_value = record.data.get(self.cursor_field_key)
Expand Down
12 changes: 7 additions & 5 deletions airbyte_cdk/sources/streams/concurrent/default_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand All @@ -21,7 +21,7 @@ def __init__(
name: str,
json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
primary_key: List[str],
cursor_field: Optional[str],
cursor_field: Optional[CursorField],
logger: Logger,
cursor: Cursor,
namespace: Optional[str] = None,
Expand Down Expand Up @@ -50,7 +50,7 @@ def namespace(self) -> Optional[str]:

@property
def cursor_field(self) -> Optional[str]:
return self._cursor_field
return self._cursor_field.cursor_field_key if self._cursor_field else None

def get_json_schema(self) -> Mapping[str, Any]:
return self._json_schema() if callable(self._json_schema) else self._json_schema
Expand All @@ -68,10 +68,12 @@ def as_airbyte_stream(self) -> AirbyteStream:
stream.namespace = self._namespace

if self._cursor_field:
stream.source_defined_cursor = True
stream.source_defined_cursor = (
not self._cursor_field.supports_catalog_defined_cursor_field
)
stream.is_resumable = True
stream.supported_sync_modes.append(SyncMode.incremental)
stream.default_cursor_field = [self._cursor_field]
stream.default_cursor_field = [self._cursor_field.cursor_field_key]

keys = self._primary_key
if keys and len(keys) > 0:
Expand Down
Loading
Loading