Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
065e737
fix(sql): Add fallback to source_defined_primary_key in CatalogProvider
devin-ai-integration[bot] Jun 27, 2025
821848b
refactor(sql): Simplify primary key fallback logic with one-liner
devin-ai-integration[bot] Jun 28, 2025
565aa64
refactor(test): Parametrize catalog provider tests to reduce duplication
devin-ai-integration[bot] Jun 28, 2025
07de856
docs(sql): Expand docstring for get_primary_keys to explain fallback …
devin-ai-integration[bot] Jun 28, 2025
b4aa7df
fix(format): Apply ruff formatting to docstring changes
devin-ai-integration[bot] Jun 28, 2025
65e8e87
feat(sql): Prioritize source_defined_primary_key and return None when…
devin-ai-integration[bot] Jun 28, 2025
be8d806
feat(sql): Add guard statements for primary key validation in merge o…
devin-ai-integration[bot] Jun 29, 2025
8d1ecae
fix(format): Break long docstring line to meet line length requirements
devin-ai-integration[bot] Jun 29, 2025
625cd1e
fix(cherry-pick me): improve messaging for 'could not import module' …
aaronsteers Jul 3, 2025
74240ab
docs(sql): Clarify that get_primary_keys returns column names, not va…
devin-ai-integration[bot] Jul 3, 2025
820d9e8
tests: improve fast test outputs, skip 'read' tests for destinations
aaronsteers Jul 3, 2025
efd9d7d
Apply suggestions from code review
aaronsteers Jul 3, 2025
aefed0d
fix missing import
aaronsteers Jul 3, 2025
0839137
apply suggestion: rename to 'raise_if_errors'
aaronsteers Jul 3, 2025
3d14724
renames (round two)
aaronsteers Jul 3, 2025
623723f
fix missing declaration
aaronsteers Jul 3, 2025
4a23c35
fix lint issue
aaronsteers Jul 4, 2025
6966f6d
fix verify, improve message output
aaronsteers Jul 4, 2025
51e2ad7
more verbose format output
aaronsteers Jul 4, 2025
7bf86ab
Merge branch 'devin/1751064114-fix-primary-key-fallback' of https://g…
aaronsteers Jul 4, 2025
10f39d2
remove extra space
aaronsteers Jul 4, 2025
fdf2273
Merge branch 'devin/1751064114-fix-primary-key-fallback' into aj/test…
aaronsteers Jul 4, 2025
66df838
always test destination-motherduck
aaronsteers Jul 4, 2025
eb0af5d
Merge branch 'main' into aj/tests/improve-fast-test-outputs
aaronsteers Jul 4, 2025
ff9e842
use proper destination test suite class
aaronsteers Jul 4, 2025
b0f28b7
remove unused import
aaronsteers Jul 4, 2025
0d8c317
fix language tags resolution
aaronsteers Jul 4, 2025
0dd3cd9
use absolute path to detect name
aaronsteers Jul 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions airbyte_cdk/sql/shared/catalog_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,18 @@ def get_stream_properties(
def get_primary_keys(
self,
stream_name: str,
) -> list[str]:
"""Return the primary keys for the given stream."""
pks = self.get_configured_stream_info(stream_name).primary_key
) -> list[str] | None:
"""Return the primary keys for the given stream.

We return `source_defined_primary_key` if set, or `primary_key` otherwise. If both are set,
we assume they should not should differ, since Airbyte data integrity constraints do not
permit overruling a source's pre-defined primary keys. If neither is set, we return `None`.
"""
configured_stream = self.get_configured_stream_info(stream_name)
pks = configured_stream.stream.source_defined_primary_key or configured_stream.primary_key

if not pks:
return []
return None

normalized_pks: list[list[str]] = [
[LowerCaseNormalizer.normalize(c) for c in pk] for pk in pks
Expand Down
15 changes: 12 additions & 3 deletions airbyte_cdk/sql/shared/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,13 @@ def _merge_temp_table_to_final_table(
"""
nl = "\n"
columns = {self._quote_identifier(c) for c in self._get_sql_column_definitions(stream_name)}
pk_columns = {
self._quote_identifier(c) for c in self.catalog_provider.get_primary_keys(stream_name)
}
primary_keys = self.catalog_provider.get_primary_keys(stream_name)
if not primary_keys:
raise exc.AirbyteInternalError(
message="Cannot merge tables without primary keys. Primary keys are required for merge operations.",
context={"stream_name": stream_name},
)
pk_columns = {self._quote_identifier(c) for c in primary_keys}
non_pk_columns = columns - pk_columns
join_clause = f"{nl} AND ".join(f"tmp.{pk_col} = final.{pk_col}" for pk_col in pk_columns)
set_clause = f"{nl} , ".join(f"{col} = tmp.{col}" for col in non_pk_columns)
Expand Down Expand Up @@ -725,6 +729,11 @@ def _emulated_merge_temp_table_to_final_table(
final_table = self._get_table_by_name(final_table_name)
temp_table = self._get_table_by_name(temp_table_name)
pk_columns = self.catalog_provider.get_primary_keys(stream_name)
if not pk_columns:
raise exc.AirbyteInternalError(
message="Cannot merge tables without primary keys. Primary keys are required for merge operations.",
context={"stream_name": stream_name},
)

columns_to_update: set[str] = self._get_sql_column_definitions(
stream_name=stream_name
Expand Down
50 changes: 50 additions & 0 deletions airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@
from airbyte_cdk.test.models.scenario import ExpectedOutcome


@dataclass
class AirbyteEntrypointException(Exception):
"""Exception raised for errors in the AirbyteEntrypoint execution.

Used to provide details of an Airbyte connector execution failure in the output
captured in an `EntrypointOutput` object. Use `EntrypointOutput.as_exception()` to
convert it to an exception.

Example Usage:
output = EntrypointOutput(...)
if output.errors:
raise output.as_exception()
"""


class EntrypointOutput:
"""A class to encapsulate the output of an Airbyte connector's execution.

Expand All @@ -67,13 +82,15 @@ def __init__(
messages: list[str] | None = None,
uncaught_exception: Optional[BaseException] = None,
*,
command: list[str] | None = None,
message_file: Path | None = None,
) -> None:
if messages is None and message_file is None:
raise ValueError("Either messages or message_file must be provided")
if messages is not None and message_file is not None:
raise ValueError("Only one of messages or message_file can be provided")

self._command = command
self._messages: list[AirbyteMessage] | None = None
self._message_file: Path | None = message_file
if messages:
Expand Down Expand Up @@ -182,6 +199,39 @@ def analytics_messages(self) -> List[AirbyteMessage]:
def errors(self) -> List[AirbyteMessage]:
return self._get_trace_message_by_trace_type(TraceType.ERROR)

def get_formatted_error_message(self) -> str:
"""Returns a human-readable error message with the contents.

If there are no errors, returns an empty string.
"""
errors = self.errors
if not errors:
# If there are no errors, return an empty string.
return ""

result = "Failed to run airbyte command"
result += ": " + " ".join(self._command) if self._command else "."
result += "\n" + "\n".join(
[str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace],
)
return result

def as_exception(self) -> AirbyteEntrypointException:
"""Convert the output to an exception."""
return AirbyteEntrypointException(self.get_formatted_error_message())

def raise_if_errors(
self,
) -> None:
"""Raise an exception if there are errors in the output.

Otherwise, do nothing.
"""
if not self.errors:
return None

raise self.as_exception()

@property
def catalog(self) -> AirbyteMessage:
catalog = self.get_message_by_types([Type.CATALOG])
Expand Down
29 changes: 4 additions & 25 deletions airbyte_cdk/test/standard_tests/_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,6 @@
)


def _errors_to_str(
entrypoint_output: entrypoint_wrapper.EntrypointOutput,
) -> str:
"""Convert errors from entrypoint output to a string."""
if not entrypoint_output.errors:
# If there are no errors, return an empty string.
return ""

return "\n" + "\n".join(
[
str(error.trace.error).replace(
"\\n",
"\n",
)
for error in entrypoint_output.errors
if error.trace
],
)


@runtime_checkable
class IConnector(Protocol):
"""A connector that can be run in a test scenario.
Expand Down Expand Up @@ -125,9 +105,7 @@ def run_test_job(
expected_outcome=test_scenario.expected_outcome,
)
if result.errors and test_scenario.expected_outcome.expect_success():
raise AssertionError(
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
)
raise result.as_exception()

if verb == "check":
# Check is expected to fail gracefully without an exception.
Expand All @@ -137,7 +115,7 @@ def run_test_job(
"Expected exactly one CONNECTION_STATUS message. Got "
f"{len(result.connection_status_messages)}:\n"
+ "\n".join([str(msg) for msg in result.connection_status_messages])
+ _errors_to_str(result)
+ result.get_formatted_error_message()
)
if test_scenario.expected_outcome.expect_exception():
conn_status = result.connection_status_messages[0].connectionStatus
Expand All @@ -161,7 +139,8 @@ def run_test_job(

if test_scenario.expected_outcome.expect_success():
assert not result.errors, (
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
f"Test job failed with {len(result.errors)} error(s): \n"
+ result.get_formatted_error_message()
)

return result
14 changes: 11 additions & 3 deletions airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None:
This assumes a python connector and should be overridden by subclasses to provide the
specific connector class to be tested.
"""
connector_root = cls.get_connector_root_dir()
connector_name = connector_root.absolute().name
connector_name = cls.connector_name

expected_module_name = connector_name.replace("-", "_").lower()
expected_class_name = connector_name.replace("-", "_").title().replace("_", "")
Expand All @@ -59,7 +58,16 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None:
try:
module = importlib.import_module(expected_module_name)
except ModuleNotFoundError as e:
raise ImportError(f"Could not import module '{expected_module_name}'.") from e
raise ImportError(
f"Could not import module '{expected_module_name}'. "
"Please ensure you are running from within the connector's virtual environment, "
"for instance by running `poetry run airbyte-cdk connector test` from the "
"connector directory. If the issue persists, check that the connector "
f"module matches the expected module name '{expected_module_name}' and that the "
f"connector class matches the expected class name '{expected_class_name}'. "
"Alternatively, you can run `airbyte-cdk image test` to run a subset of tests "
"against the connector's image."
) from e
finally:
# Change back to the original working directory
os.chdir(cwd_snapshot)
Expand Down
95 changes: 39 additions & 56 deletions airbyte_cdk/test/standard_tests/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,18 @@
DestinationSyncMode,
SyncMode,
)
from airbyte_cdk.models.airbyte_protocol_serializers import (
AirbyteCatalogSerializer,
AirbyteStreamSerializer,
)
from airbyte_cdk.models.connector_metadata import MetadataFile
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.models import ConnectorTestScenario
from airbyte_cdk.test.utils.reading import catalog
from airbyte_cdk.utils.connector_paths import (
ACCEPTANCE_TEST_CONFIG,
find_connector_root,
)
from airbyte_cdk.utils.docker import build_connector_image, run_docker_command
from airbyte_cdk.utils.docker import (
build_connector_image,
run_docker_airbyte_command,
run_docker_command,
)


class DockerConnectorTestSuite:
Expand All @@ -55,6 +54,17 @@ def get_connector_root_dir(cls) -> Path:
"""Get the root directory of the connector."""
return find_connector_root([cls.get_test_class_dir(), Path.cwd()])

@classproperty
def connector_name(self) -> str:
"""Get the name of the connector."""
connector_root = self.get_connector_root_dir()
return connector_root.absolute().name

@classmethod
def is_destination_connector(cls) -> bool:
"""Check if the connector is a destination."""
return cls.connector_name.startswith("destination-")

@classproperty
def acceptance_test_config_path(cls) -> Path:
"""Get the path to the acceptance test config file."""
Expand Down Expand Up @@ -145,23 +155,16 @@ def test_docker_image_build_and_spec(
no_verify=False,
)

try:
result: CompletedProcess[str] = run_docker_command(
[
"docker",
"run",
"--rm",
connector_image,
"spec",
],
check=True, # Raise an error if the command fails
capture_stderr=True,
capture_stdout=True,
)
except SubprocessError as ex:
raise AssertionError(
f"Failed to run `spec` command in docker image {connector_image!r}. Error: {ex!s}"
) from None
_ = run_docker_airbyte_command(
[
"docker",
"run",
"--rm",
connector_image,
"spec",
],
check=True,
)

@pytest.mark.skipif(
shutil.which("docker") is None,
Expand Down Expand Up @@ -203,7 +206,7 @@ def test_docker_image_build_and_check(
with scenario.with_temp_config_file(
connector_root=connector_root,
) as temp_config_file:
_ = run_docker_command(
_ = run_docker_airbyte_command(
[
"docker",
"run",
Expand All @@ -215,9 +218,7 @@ def test_docker_image_build_and_check(
"--config",
container_config_path,
],
check=True, # Raise an error if the command fails
capture_stderr=True,
capture_stdout=True,
check=True,
)

@pytest.mark.skipif(
Expand All @@ -242,6 +243,9 @@ def test_docker_image_build_and_read(
the local docker image cache using `docker image prune -a` command.
- If the --connector-image arg is provided, it will be used instead of building the image.
"""
if self.is_destination_connector():
pytest.skip("Skipping read test for destination connector.")

if scenario.expected_outcome.expect_exception():
pytest.skip("Skipping (expected to fail).")

Expand Down Expand Up @@ -295,7 +299,7 @@ def test_docker_image_build_and_read(
) as temp_dir_str,
):
temp_dir = Path(temp_dir_str)
discover_result = run_docker_command(
discover_result = run_docker_airbyte_command(
[
"docker",
"run",
Expand All @@ -307,20 +311,12 @@ def test_docker_image_build_and_read(
"--config",
container_config_path,
],
check=True, # Raise an error if the command fails
capture_stderr=True,
capture_stdout=True,
check=True,
)
parsed_output = EntrypointOutput(messages=discover_result.stdout.splitlines())
try:
catalog_message = parsed_output.catalog # Get catalog message
assert catalog_message.catalog is not None, "Catalog message missing catalog."
discovered_catalog: AirbyteCatalog = parsed_output.catalog.catalog
except Exception as ex:
raise AssertionError(
f"Failed to load discovered catalog from {discover_result.stdout}. "
f"Error: {ex!s}"
) from None

catalog_message = discover_result.catalog # Get catalog message
assert catalog_message.catalog is not None, "Catalog message missing catalog."
discovered_catalog: AirbyteCatalog = catalog_message.catalog
if not discovered_catalog.streams:
raise ValueError(
f"Discovered catalog for connector '{connector_name}' is empty. "
Expand Down Expand Up @@ -355,7 +351,7 @@ def test_docker_image_build_and_read(
configured_catalog_path.write_text(
orjson.dumps(asdict(configured_catalog)).decode("utf-8")
)
read_result: CompletedProcess[str] = run_docker_command(
read_result: EntrypointOutput = run_docker_airbyte_command(
[
"docker",
"run",
Expand All @@ -371,18 +367,5 @@ def test_docker_image_build_and_read(
"--catalog",
container_catalog_path,
],
check=False,
capture_stderr=True,
capture_stdout=True,
check=True,
)
if read_result.returncode != 0:
raise AssertionError(
f"Failed to run `read` command in docker image {connector_image!r}. "
"\n-----------------"
f"EXIT CODE: {read_result.returncode}\n"
"STDERR:\n"
f"{read_result.stderr}\n"
f"STDOUT:\n"
f"{read_result.stdout}\n"
"\n-----------------"
) from None
Loading
Loading