Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def create_source(
limits: TestLimits,
catalog: Optional[ConfiguredAirbyteCatalog],
state: Optional[List[AirbyteStateMessage]],
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
) -> ConcurrentDeclarativeSource:
manifest = config["__injected_declarative_manifest"]

# We enforce a concurrency level of 1 so that the stream is processed on a single thread
Expand All @@ -88,7 +88,7 @@ def create_source(


def read_stream(
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
source: ConcurrentDeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
Expand Down Expand Up @@ -127,7 +127,7 @@ def read_stream(


def resolve_manifest(
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
source: ConcurrentDeclarativeSource,
) -> AirbyteMessage:
try:
return AirbyteMessage(
Expand All @@ -146,7 +146,7 @@ def resolve_manifest(


def full_resolve_manifest(
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], limits: TestLimits
source: ConcurrentDeclarativeSource, limits: TestLimits
) -> AirbyteMessage:
try:
manifest = {**source.resolved_manifest}
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def get_config_and_catalog_from_args(


def handle_connector_builder_request(
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
source: ConcurrentDeclarativeSource,
command: str,
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __init__(

def run_test_read(
self,
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
source: ConcurrentDeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
stream_name: str,
Expand Down Expand Up @@ -383,7 +383,7 @@ def _get_latest_config_update(

def _read_stream(
self,
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
source: ConcurrentDeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
Expand Down
6 changes: 2 additions & 4 deletions airbyte_cdk/manifest_server/command_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@


class ManifestCommandProcessor:
_source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
_source: ConcurrentDeclarativeSource
_logger = logging.getLogger("airbyte.manifest-server")

def __init__(
self, source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
) -> None:
def __init__(self, source: ConcurrentDeclarativeSource) -> None:
self._source = source

def test_read(
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/manifest_server/command_processor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def build_source(
record_limit: Optional[int] = None,
page_limit: Optional[int] = None,
slice_limit: Optional[int] = None,
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
) -> ConcurrentDeclarativeSource:
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
# to retain ordering for the grouping of the builder message responses.
definition = copy.deepcopy(manifest)
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/manifest_server/routers/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def safe_build_source(
page_limit: Optional[int] = None,
slice_limit: Optional[int] = None,
record_limit: Optional[int] = None,
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
) -> ConcurrentDeclarativeSource:
"""Wrapper around build_source that converts ValidationError to HTTPException."""
try:
return build_source(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def _get_declarative_component_schema() -> Dict[str, Any]:
# is no longer inherited from since the only external dependency is from that class.
#
# todo: It is worth investigating removal of the Generic[TState] since it will always be Optional[List[AirbyteStateMessage]]
class ConcurrentDeclarativeSource(AbstractSource, Generic[TState]):
class ConcurrentDeclarativeSource(AbstractSource):
# By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
# because it has hit the limit of futures but not partition reader is consuming them.
_LOWEST_SAFE_CONCURRENCY_LEVEL = 2
Expand All @@ -171,8 +171,8 @@ def __init__(
self,
catalog: Optional[ConfiguredAirbyteCatalog],
config: Optional[Mapping[str, Any]],
state: TState,
source_config: ConnectionDefinition,
state: Optional[List[AirbyteStateMessage]] = None,
debug: bool = False,
emit_connector_builder_messages: bool = False,
migrate_manifest: bool = False,
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/yaml_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from airbyte_cdk.sources.types import ConnectionDefinition


class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
class YamlDeclarativeSource(ConcurrentDeclarativeSource):
"""Declarative source defined by a yaml file"""

def __init__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2219,7 +2219,12 @@ def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMess
)
config = {}
state = {}
source = ConcurrentDeclarativeSource(catalog, config, state, manifest)
source = ConcurrentDeclarativeSource(
catalog=catalog,
config=config,
source_config=manifest,
state=state,
)
return list(source.read(logger, {}, catalog, state))


Expand Down
Loading