Skip to content

Commit 057640c

Browse files
authored
Ensure delete API is consistent with other clients (#1867)
* Add permanent flag to delete version API * Minor fix * Fix mock * Fix mock * Fix mock * Minor cleanup
1 parent 888829a commit 057640c

File tree

3 files changed

+39
-41
lines changed

3 files changed

+39
-41
lines changed

src/confluent_kafka/schema_registry/mock_schema_registry_client.py

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,16 @@ def get_schema(self, schema_id: int) -> Optional[Schema]:
5252
rs = self.schema_id_index.get(schema_id, None)
5353
return rs.schema if rs else None
5454

55-
def get_schema_id_by_subject(self, subject: str, schema: Schema) -> Optional[int]:
56-
with self.lock:
57-
if schema in self.subject_schemas[subject]:
58-
return self.schema_index.get(schema, None)
59-
return None
60-
6155
def get_registered_schema_by_schema(
62-
self, subject: str,
56+
self,
57+
subject_name: str,
6358
schema: Schema
6459
) -> Optional[RegisteredSchema]:
6560
with self.lock:
66-
for rs in self.subject_schemas[subject]:
67-
if rs.schema == schema:
68-
return rs
61+
if subject_name in self.subject_schemas:
62+
for rs in self.subject_schemas[subject_name]:
63+
if rs.schema == schema:
64+
return rs
6965
return None
7066

7167
def get_version(self, subject_name: str, version: int) -> Optional[RegisteredSchema]:
@@ -119,15 +115,18 @@ def remove_by_schema(self, registered_schema: RegisteredSchema):
119115
if subject_name in self.subject_schemas:
120116
self.subject_schemas[subject_name].remove(registered_schema)
121117

122-
def remove_by_subject(self, subject_name: str):
118+
def remove_by_subject(self, subject_name: str) -> List[int]:
123119
with self.lock:
120+
versions = []
124121
if subject_name in self.subject_schemas:
125122
for rs in self.subject_schemas[subject_name]:
123+
versions.append(rs.version)
126124
schema_id = self.schema_index.pop(rs.schema, None)
127125
if schema_id is not None:
128126
self.schema_id_index.pop(schema_id, None)
129127

130128
del self.subject_schemas[subject_name]
129+
return versions
131130

132131
def clear(self):
133132
with self.lock:
@@ -153,14 +152,8 @@ def register_schema_full_response(
153152
self, subject_name: str, schema: 'Schema',
154153
normalize_schemas: bool = False
155154
) -> 'RegisteredSchema':
156-
schema_id = self._store.get_schema_id_by_subject(subject_name, schema)
157-
if schema_id is not None:
158-
registered_schema = RegisteredSchema(
159-
schema_id=schema_id,
160-
schema=schema,
161-
subject=subject_name,
162-
version=None
163-
)
155+
registered_schema = self._store.get_registered_schema_by_schema(subject_name, schema)
156+
if registered_schema is not None:
164157
return registered_schema
165158

166159
latest_schema = self._store.get_latest_version(subject_name)
@@ -198,11 +191,11 @@ def lookup_schema(
198191

199192
raise SchemaRegistryError(404, 40400, "Schema Not Found")
200193

201-
def get_subjects(self):
194+
def get_subjects(self) -> List[str]:
202195
return self._store.get_subjects()
203196

204-
def delete_subject(self, subject_name: str, permanent: bool = False):
205-
self._store.remove_by_subject(subject_name)
197+
def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]:
198+
return self._store.remove_by_subject(subject_name)
206199

207200
def get_latest_version(self, subject_name: str, fmt: str = None) -> 'RegisteredSchema':
208201
registered_schema = self._store.get_latest_version(subject_name)
@@ -234,7 +227,7 @@ def get_version(
234227
def get_versions(self, subject_name: str) -> List[int]:
235228
return self._store.get_versions(subject_name)
236229

237-
def delete_version(self, subject_name: str, version: int) -> int:
230+
def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int:
238231
registered_schema = self._store.get_version(subject_name, version)
239232
if registered_schema is not None:
240233
self._store.remove_by_schema(registered_schema)

src/confluent_kafka/schema_registry/schema_registry_client.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ def lookup_schema(
770770

771771
return registered_schema
772772

773-
def get_subjects(self):
773+
def get_subjects(self) -> List[str]:
774774
"""
775775
List all subjects registered with the Schema Registry
776776
@@ -786,7 +786,7 @@ def get_subjects(self):
786786

787787
return self._rest_client.get('subjects')
788788

789-
def delete_subject(self, subject_name: str, permanent: bool = False):
789+
def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]:
790790
"""
791791
Deletes the specified subject and its associated compatibility level if
792792
registered. It is recommended to use this API only when a topic needs
@@ -806,14 +806,13 @@ def delete_subject(self, subject_name: str, permanent: bool = False):
806806
`DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_
807807
""" # noqa: E501
808808

809-
versions = self._rest_client.delete('subjects/{}'
810-
.format(_urlencode(subject_name)))
811-
812809
if permanent:
813-
self._rest_client.delete('subjects/{}?permanent=true'
814-
.format(_urlencode(subject_name)))
815-
816-
self._cache.remove_by_subject(subject_name)
810+
versions = self._rest_client.delete('subjects/{}?permanent=true'
811+
.format(_urlencode(subject_name)))
812+
self._cache.remove_by_subject(subject_name)
813+
else:
814+
versions = self._rest_client.delete('subjects/{}'
815+
.format(_urlencode(subject_name)))
817816

818817
return versions
819818

@@ -949,7 +948,7 @@ def get_versions(self, subject_name: str) -> List[int]:
949948

950949
return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)))
951950

952-
def delete_version(self, subject_name: str, version: int) -> int:
951+
def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int:
953952
"""
954953
Deletes a specific version registered to ``subject_name``.
955954
@@ -958,6 +957,8 @@ def delete_version(self, subject_name: str, version: int) -> int:
958957
959958
version (int): Version number
960959
960+
permanent (bool): True for a hard delete, False (default) for a soft delete
961+
961962
Returns:
962963
int: Version number which was deleted
963964
@@ -968,11 +969,15 @@ def delete_version(self, subject_name: str, version: int) -> int:
968969
`Delete Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
969970
""" # noqa: E501
970971

971-
response = self._rest_client.delete('subjects/{}/versions/{}'.
972-
format(_urlencode(subject_name),
973-
version))
974-
975-
self._cache.remove_by_subject_version(subject_name, version)
972+
if permanent:
973+
response = self._rest_client.delete('subjects/{}/versions/{}?permanent=true'
974+
.format(_urlencode(subject_name),
975+
version))
976+
self._cache.remove_by_subject_version(subject_name, version)
977+
else:
978+
response = self._rest_client.delete('subjects/{}/versions/{}'
979+
.format(_urlencode(subject_name),
980+
version))
976981

977982
return response
978983

src/confluent_kafka/schema_registry/serde.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,15 +291,15 @@ def _execute_rules(
291291
return message
292292
rules: Optional[List[Rule]] = None
293293
if rule_mode == RuleMode.UPGRADE:
294-
if target.rule_set is not None:
294+
if target is not None and target.rule_set is not None:
295295
rules = target.rule_set.migration_rules
296296
elif rule_mode == RuleMode.DOWNGRADE:
297-
if source.rule_set is not None:
297+
if source is not None and source.rule_set is not None:
298298
rules = source.rule_set.migration_rules
299299
rules = rules[:] if rules else []
300300
rules.reverse()
301301
else:
302-
if target.rule_set is not None:
302+
if target is not None and target.rule_set is not None:
303303
rules = target.rule_set.domain_rules
304304
if rule_mode == RuleMode.READ:
305305
# Execute read rules in reverse order for symmetry

0 commit comments

Comments
 (0)