Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .github/workflows/python_lint.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: Linters
permissions:
contents: read

on:
push:
Expand Down Expand Up @@ -53,8 +55,8 @@ jobs:
- name: Check code format
run: poetry run ruff format --diff .

mypy-check:
name: MyPy Check
pyrefly-check:
name: Python Type Checking
runs-on: ubuntu-24.04
steps:
# Common steps:
Expand All @@ -74,5 +76,5 @@ jobs:

# Job-specific step(s):

- name: Run mypy
run: poetry run mypy --config-file mypy.ini airbyte_cdk
- name: Run pyrefly
run: poetry run pyrefly check airbyte_cdk
8 changes: 8 additions & 0 deletions airbyte_cdk/cli/airbyte_cdk/_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,15 @@ def list_(
table.add_column("Labels", justify="left", style="magenta", overflow="fold")
table.add_column("Created", justify="left", style="blue", overflow="fold")
for secret in secrets:
# pyrefly: ignore # missing-attribute
full_secret_name = secret.name
secret_name = _extract_secret_name(full_secret_name)
secret_url = _get_secret_url(secret_name, gcp_project_id)
table.add_row(
f"[link={secret_url}]{secret_name}[/link]",
# pyrefly: ignore # missing-attribute
"\n".join([f"{k}={v}" for k, v in secret.labels.items()]),
# pyrefly: ignore # missing-attribute
str(secret.create_time),
)

Expand Down Expand Up @@ -360,6 +363,7 @@ def _write_secret_file(
"""
# List all enabled versions of the secret.
response = client.list_secret_versions(
# pyrefly: ignore # missing-attribute
request={"parent": secret.name, "filter": "state:ENABLED"}
)

Expand All @@ -368,6 +372,7 @@ def _write_secret_file(
versions = list(response)

if not versions:
# pyrefly: ignore # missing-attribute
secret_name = _extract_secret_name(secret.name)
raise ConnectorSecretWithNoValidVersionsError(
connector_name=connector_name,
Expand Down Expand Up @@ -404,7 +409,9 @@ def _get_secret_filepath(
secret: Secret, # type: ignore
) -> Path:
"""Get the file path for a secret based on its labels."""
# pyrefly: ignore # missing-attribute, unsupported-operation
if secret.labels and "filename" in secret.labels:
# pyrefly: ignore # index-error
return secrets_dir / f"{secret.labels['filename']}.json"

return secrets_dir / "config.json" # Default filename
Expand Down Expand Up @@ -468,6 +475,7 @@ def _print_ci_secrets_masks_for_config(
_print_ci_secrets_masks_for_config(item)

if isinstance(config, dict):
# pyrefly: ignore # bad-assignment
for key, value in config.items():
if _is_secret_property(key):
logger.debug(f"Masking secret for config key: {key}")
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ConfigObserver:
"""

def set_config(self, config: ObservedDict) -> None:
# pyrefly: ignore # implicitly-defined-attribute
self.config = config

def update(self) -> None:
Expand Down Expand Up @@ -99,6 +100,7 @@ def create_connector_config_control_message(config: MutableMapping[str, Any]) ->
control_message = AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=time.time() * 1000,
# pyrefly: ignore # bad-argument-type
connectorConfig=AirbyteControlConnectorConfigMessage(config=config),
)
return AirbyteMessage(type=Type.CONTROL, control=control_message)
1 change: 1 addition & 0 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def _check_record_limit(self, record_limit: Optional[int] = None) -> int:
if record_limit is None:
record_limit = self._max_record_limit
else:
# pyrefly: ignore # no-matching-overload
record_limit = min(record_limit, self._max_record_limit)

return record_limit
Expand Down
11 changes: 11 additions & 0 deletions airbyte_cdk/destinations/vector_db_based/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class SeparatorSplitterConfigModel(BaseModel):
description="Whether to keep the separator in the resulting chunks",
)

# pyrefly: ignore # bad-override
class Config(OneOfOptionConfig):
title = "By Separator"
description = "Split the text by the list of separators until the chunk size is reached, using the earlier mentioned separators where possible. This is useful for splitting text fields by paragraphs, sentences, words, etc."
Expand All @@ -40,6 +41,7 @@ class MarkdownHeaderSplitterConfigModel(BaseModel):
ge=1,
)

# pyrefly: ignore # bad-override
class Config(OneOfOptionConfig):
title = "By Markdown header"
description = "Split the text by Markdown headers down to the specified header level. If the chunk size fits multiple sections, they will be combined into a single chunk."
Expand Down Expand Up @@ -71,6 +73,7 @@ class CodeSplitterConfigModel(BaseModel):
],
)

# pyrefly: ignore # bad-override
class Config(OneOfOptionConfig):
title = "By Programming Language"
description = "Split the text by suitable delimiters based on the programming language. This is useful for splitting code into chunks."
Expand Down Expand Up @@ -129,6 +132,7 @@ class ProcessingConfigModel(BaseModel):
description="List of fields to rename. Not applicable for nested fields, but can be used to rename fields already flattened via dot notation.",
)

# pyrefly: ignore # bad-override
class Config:
schema_extra = {"group": "processing"}

Expand All @@ -137,6 +141,7 @@ class OpenAIEmbeddingConfigModel(BaseModel):
mode: Literal["openai"] = Field("openai", const=True)
openai_key: str = Field(..., title="OpenAI API key", airbyte_secret=True)

# pyrefly: ignore # bad-override
class Config(OneOfOptionConfig):
title = "OpenAI"
description = "Use the OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions."
Expand Down Expand Up @@ -164,6 +169,7 @@ class OpenAICompatibleEmbeddingConfigModel(BaseModel):
examples=[1536, 384],
)

# pyrefly: ignore # bad-override
class Config(OneOfOptionConfig):
title = "OpenAI-compatible"
description = "Use a service that's compatible with the OpenAI API to embed text."
Expand Down Expand Up @@ -191,6 +197,7 @@ class AzureOpenAIEmbeddingConfigModel(BaseModel):
examples=["your-resource-name"],
)

# pyrefly: ignore # bad-override
class Config(OneOfOptionConfig):
title = "Azure OpenAI"
description = "Use the Azure-hosted OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions."
Expand All @@ -200,6 +207,7 @@ class Config(OneOfOptionConfig):
class FakeEmbeddingConfigModel(BaseModel):
mode: Literal["fake"] = Field("fake", const=True)

# pyrefly: ignore # bad-override
class Config(OneOfOptionConfig):
title = "Fake"
description = "Use a fake embedding made out of random vectors with 1536 embedding dimensions. This is useful for testing the data pipeline without incurring any costs."
Expand All @@ -221,6 +229,7 @@ class FromFieldEmbeddingConfigModel(BaseModel):
examples=[1536, 384],
)

# pyrefly: ignore # bad-override
class Config(OneOfOptionConfig):
title = "From Field"
description = "Use a field in the record as the embedding. This is useful if you already have an embedding for your data and want to store it in the vector store."
Expand All @@ -231,6 +240,7 @@ class CohereEmbeddingConfigModel(BaseModel):
mode: Literal["cohere"] = Field("cohere", const=True)
cohere_key: str = Field(..., title="Cohere API key", airbyte_secret=True)

# pyrefly: ignore # bad-override
class Config(OneOfOptionConfig):
title = "Cohere"
description = "Use the Cohere API to embed text."
Expand Down Expand Up @@ -273,6 +283,7 @@ class VectorDBConfigModel(BaseModel):
description="Do not store the text that gets embedded along with the vector and the metadata in the destination. If set to true, only the vector and the metadata will be stored - in this case raw text for LLM use cases needs to be retrieved from another source.",
)

# pyrefly: ignore # bad-override
class Config:
title = "Destination Config"
schema_extra = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class DocumentProcessor:
@staticmethod
def check_config(config: ProcessingConfigModel) -> Optional[str]:
if config.text_splitter is not None and config.text_splitter.mode == "separator":
# pyrefly: ignore # missing-attribute
for s in config.text_splitter.separators:
try:
separator = json.loads(s)
Expand All @@ -85,21 +86,25 @@ def _get_text_splitter(
chunk_size: int,
chunk_overlap: int,
splitter_config: Optional[TextSplitterConfigModel],
# pyrefly: ignore # bad-return
) -> RecursiveCharacterTextSplitter:
if splitter_config is None:
splitter_config = SeparatorSplitterConfigModel(mode="separator")
if splitter_config.mode == "separator":
return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
# pyrefly: ignore # missing-attribute
separators=[json.loads(s) for s in splitter_config.separators],
# pyrefly: ignore # missing-attribute
keep_separator=splitter_config.keep_separator,
disallowed_special=(),
)
if splitter_config.mode == "markdown":
return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
# pyrefly: ignore # missing-attribute
separators=headers_to_split_on[: splitter_config.split_level],
is_separator_regex=True,
keep_separator=True,
Expand All @@ -110,6 +115,7 @@ def _get_text_splitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=RecursiveCharacterTextSplitter.get_separators_for_language(
# pyrefly: ignore # bad-argument-type, missing-attribute
Language(splitter_config.language)
),
disallowed_special=(),
Expand Down Expand Up @@ -218,6 +224,7 @@ def _remap_field_names(self, fields: Dict[str, Any]) -> Dict[str, Any]:
new_fields = fields.copy()
for mapping in self.field_name_mappings:
if mapping.from_field in new_fields:
# pyrefly: ignore # missing-attribute
new_fields[mapping.to_field] = new_fields.pop(mapping.from_field)

return new_fields
5 changes: 5 additions & 0 deletions airbyte_cdk/destinations/vector_db_based/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ def __init__(
self._init_batch()

def _init_batch(self) -> None:
# pyrefly: ignore # implicitly-defined-attribute
self.chunks: Dict[Tuple[str, str], List[Chunk]] = defaultdict(list)
# pyrefly: ignore # implicitly-defined-attribute
self.ids_to_delete: Dict[Tuple[str, str], List[str]] = defaultdict(list)
# pyrefly: ignore # implicitly-defined-attribute
self.number_of_chunks = 0

def _convert_to_document(self, chunk: Chunk) -> Document:
Expand Down Expand Up @@ -73,6 +76,7 @@ def _process_batch(self) -> None:
def write(
self, configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
# pyrefly: ignore # implicitly-defined-attribute
self.processor = DocumentProcessor(self.processing_config, configured_catalog)
self.indexer.pre_sync(configured_catalog)
for message in input_messages:
Expand All @@ -82,6 +86,7 @@ def write(
self._process_batch()
yield message
elif message.type == Type.RECORD:
# pyrefly: ignore # bad-argument-type
record_chunks, record_id_to_delete = self.processor.process(message.record)
self.chunks[
( # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]"
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ def extract_config(cls, args: List[str]) -> Optional[Any]:
return None

def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]:
# pyrefly: ignore # missing-attribute
if hasattr(source, "message_repository") and source.message_repository:
# pyrefly: ignore # missing-attribute
yield from source.message_repository.consume_queue()
return

Expand All @@ -373,6 +375,7 @@ def launch(source: Source, args: List[str]) -> None:
parsed_args = source_entrypoint.parse_args(args)
# temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs
# Refer to: https://github.com/airbytehq/oncall/issues/6235
# pyrefly: ignore # bad-context-manager
with PRINT_BUFFER:
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
Expand All @@ -388,6 +391,7 @@ def _init_internal_request_filter() -> None:

@wraps(wrapped_fn)
def filtered_send(self: Any, request: PreparedRequest, **kwargs: Any) -> Response:
# pyrefly: ignore # no-matching-overload
parsed_url = urlparse(request.url)

if parsed_url.scheme not in VALID_URL_SCHEMES:
Expand Down
7 changes: 6 additions & 1 deletion airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ def format(self, record: logging.LogRecord) -> str:
message = super().format(record)
message = filter_secrets(message)
log_message = AirbyteMessage(
type=Type.LOG, log=AirbyteLogMessage(level=airbyte_level, message=message)
type=Type.LOG,
log=AirbyteLogMessage(
level=airbyte_level, # pyrefly: ignore # bad-argument-type
message=message,
),
)
return orjson.dumps(AirbyteMessageSerializer.dump(log_message)).decode()

Expand All @@ -100,6 +104,7 @@ def log_by_prefix(msg: str, default_level: str) -> Tuple[int, str]:
split_line = msg.split()
first_word = next(iter(split_line), None)
if first_word in valid_log_types:
# pyrefly: ignore # no-matching-overload
log_level = logging.getLevelName(first_word)
rendered_message = " ".join(split_line[1:])
else:
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# might not need dpath leading to longer initialization time.
# There is a downside in using dpath as a library since the options are global: if we have two pieces of code that want different options,
# this will not be thread-safe.
# pyrefly: ignore # bad-assignment
dpath.options.ALLOW_EMPTY_STRING_KEYS = True

__all__ = [
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/sources/concurrent_source/concurrent_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def create(
"It is required to have more workers than threads generating partitions"
)
threadpool = ThreadPoolManager(
# pyrefly: ignore # implicit-import
concurrent.futures.ThreadPoolExecutor(
max_workers=num_workers, thread_name_prefix="workerpool"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def convert_to_concurrent_stream(

stream.cursor = cursor # type: ignore[assignment] # cursor is of type ConcurrentCursor, which inherits from Cursor
if hasattr(stream, "parent"):
# pyrefly: ignore # missing-attribute
stream.parent.cursor = cursor
else:
cursor = FinalStateCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def __init__(
where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of
"trying to start a parent job" and "ConcurrentJobLimitReached".
"""
# pyrefly: ignore # not-iterable
if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES:
# this is to prevent developers updating the possible statuses without updating the logic of this class
raise ValueError(
Expand Down
3 changes: 3 additions & 0 deletions airbyte_cdk/sources/declarative/auth/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def get_client_id_name(self) -> str:
return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context

def get_client_id(self) -> str:
# pyrefly: ignore # missing-attribute
client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
if not client_id:
raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
Expand All @@ -201,6 +202,7 @@ def get_client_secret_name(self) -> str:

def get_client_secret(self) -> str:
client_secret = (
# pyrefly: ignore # missing-attribute
self._client_secret.eval(self.config) if self._client_secret else self._client_secret
)
if not client_secret:
Expand Down Expand Up @@ -270,6 +272,7 @@ def build_refresh_request_body(self) -> Mapping[str, Any]:
return super().build_refresh_request_body()

@property
# pyrefly: ignore # bad-override
def access_token(self) -> str:
if self._access_token is None:
raise ValueError("access_token is not set")
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/sources/declarative/auth/token_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def _refresh(self) -> None:
raise ReadException("Failed to get session token, response got ignored by requester")
session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path)
if self.expiration_duration is not None:
# pyrefly: ignore # bad-assignment
self._next_expiration_time = ab_datetime_now() + self.expiration_duration
self._token = session_token # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def check_connection(
return False, reason
except Exception as error:
error_message = (
# pyrefly: ignore # unbound-name
f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
)
logger.error(error_message, exc_info=True)
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def _check_dynamic_streams_availability(
f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.",
)

# pyrefly: ignore # no-matching-overload
generated = generated_streams.get(check_config.dynamic_stream_name, [])
stream_availability, message = self._check_generated_streams_availability(
generated, stream_name_to_stream, logger, check_config.stream_count
Expand Down
Loading
Loading