Skip to content

Ensure delete API is consistent with other clients #1867

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,16 @@ def get_schema(self, schema_id: int) -> Optional[Schema]:
rs = self.schema_id_index.get(schema_id, None)
return rs.schema if rs else None

def get_schema_id_by_subject(self, subject: str, schema: Schema) -> Optional[int]:
with self.lock:
if schema in self.subject_schemas[subject]:
return self.schema_index.get(schema, None)
return None

def get_registered_schema_by_schema(
self, subject: str,
self,
subject_name: str,
schema: Schema
) -> Optional[RegisteredSchema]:
with self.lock:
for rs in self.subject_schemas[subject]:
if rs.schema == schema:
return rs
if subject_name in self.subject_schemas:
for rs in self.subject_schemas[subject_name]:
if rs.schema == schema:
return rs
return None

def get_version(self, subject_name: str, version: int) -> Optional[RegisteredSchema]:
Expand Down Expand Up @@ -119,15 +115,18 @@ def remove_by_schema(self, registered_schema: RegisteredSchema):
if subject_name in self.subject_schemas:
self.subject_schemas[subject_name].remove(registered_schema)

def remove_by_subject(self, subject_name: str):
def remove_by_subject(self, subject_name: str) -> List[int]:
with self.lock:
versions = []
if subject_name in self.subject_schemas:
for rs in self.subject_schemas[subject_name]:
versions.append(rs.version)
schema_id = self.schema_index.pop(rs.schema, None)
if schema_id is not None:
self.schema_id_index.pop(schema_id, None)

del self.subject_schemas[subject_name]
return versions

def clear(self):
with self.lock:
Expand All @@ -153,14 +152,8 @@ def register_schema_full_response(
self, subject_name: str, schema: 'Schema',
normalize_schemas: bool = False
) -> 'RegisteredSchema':
schema_id = self._store.get_schema_id_by_subject(subject_name, schema)
if schema_id is not None:
registered_schema = RegisteredSchema(
schema_id=schema_id,
schema=schema,
subject=subject_name,
version=None
)
registered_schema = self._store.get_registered_schema_by_schema(subject_name, schema)
if registered_schema is not None:
return registered_schema

latest_schema = self._store.get_latest_version(subject_name)
Expand Down Expand Up @@ -198,11 +191,11 @@ def lookup_schema(

raise SchemaRegistryError(404, 40400, "Schema Not Found")

def get_subjects(self):
def get_subjects(self) -> List[str]:
return self._store.get_subjects()

def delete_subject(self, subject_name: str, permanent: bool = False):
self._store.remove_by_subject(subject_name)
def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]:
return self._store.remove_by_subject(subject_name)

def get_latest_version(self, subject_name: str, fmt: str = None) -> 'RegisteredSchema':
registered_schema = self._store.get_latest_version(subject_name)
Expand Down Expand Up @@ -234,7 +227,7 @@ def get_version(
def get_versions(self, subject_name: str) -> List[int]:
return self._store.get_versions(subject_name)

def delete_version(self, subject_name: str, version: int) -> int:
def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int:
registered_schema = self._store.get_version(subject_name, version)
if registered_schema is not None:
self._store.remove_by_schema(registered_schema)
Expand Down
35 changes: 20 additions & 15 deletions src/confluent_kafka/schema_registry/schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ def lookup_schema(

return registered_schema

def get_subjects(self):
def get_subjects(self) -> List[str]:
"""
List all subjects registered with the Schema Registry

Expand All @@ -786,7 +786,7 @@ def get_subjects(self):

return self._rest_client.get('subjects')

def delete_subject(self, subject_name: str, permanent: bool = False):
def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]:
"""
Deletes the specified subject and its associated compatibility level if
registered. It is recommended to use this API only when a topic needs
Expand All @@ -806,14 +806,13 @@ def delete_subject(self, subject_name: str, permanent: bool = False):
`DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_
""" # noqa: E501

versions = self._rest_client.delete('subjects/{}'
.format(_urlencode(subject_name)))

if permanent:
self._rest_client.delete('subjects/{}?permanent=true'
.format(_urlencode(subject_name)))

self._cache.remove_by_subject(subject_name)
versions = self._rest_client.delete('subjects/{}?permanent=true'
.format(_urlencode(subject_name)))
self._cache.remove_by_subject(subject_name)
else:
versions = self._rest_client.delete('subjects/{}'
.format(_urlencode(subject_name)))

return versions

Expand Down Expand Up @@ -949,7 +948,7 @@ def get_versions(self, subject_name: str) -> List[int]:

return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)))

def delete_version(self, subject_name: str, version: int) -> int:
def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int:
"""
Deletes a specific version registered to ``subject_name``.

Expand All @@ -958,6 +957,8 @@ def delete_version(self, subject_name: str, version: int) -> int:

version (int): Version number

permanent (bool): True for a hard delete, False (default) for a soft delete

Returns:
int: Version number which was deleted

Expand All @@ -968,11 +969,15 @@ def delete_version(self, subject_name: str, version: int) -> int:
`Delete Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
""" # noqa: E501

response = self._rest_client.delete('subjects/{}/versions/{}'.
format(_urlencode(subject_name),
version))

self._cache.remove_by_subject_version(subject_name, version)
if permanent:
response = self._rest_client.delete('subjects/{}/versions/{}?permanent=true'
.format(_urlencode(subject_name),
version))
self._cache.remove_by_subject_version(subject_name, version)
else:
response = self._rest_client.delete('subjects/{}/versions/{}'
.format(_urlencode(subject_name),
version))

return response

Expand Down
6 changes: 3 additions & 3 deletions src/confluent_kafka/schema_registry/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,15 @@ def _execute_rules(
return message
rules: Optional[List[Rule]] = None
if rule_mode == RuleMode.UPGRADE:
if target.rule_set is not None:
if target is not None and target.rule_set is not None:
rules = target.rule_set.migration_rules
elif rule_mode == RuleMode.DOWNGRADE:
if source.rule_set is not None:
if source is not None and source.rule_set is not None:
rules = source.rule_set.migration_rules
rules = rules[:] if rules else []
rules.reverse()
else:
if target.rule_set is not None:
if target is not None and target.rule_set is not None:
rules = target.rule_set.domain_rules
if rule_mode == RuleMode.READ:
# Execute read rules in reverse order for symmetry
Expand Down