diff --git a/src/confluent_kafka/schema_registry/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/mock_schema_registry_client.py index 892f00ec8..78267dedf 100644 --- a/src/confluent_kafka/schema_registry/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/mock_schema_registry_client.py @@ -52,20 +52,16 @@ def get_schema(self, schema_id: int) -> Optional[Schema]: rs = self.schema_id_index.get(schema_id, None) return rs.schema if rs else None - def get_schema_id_by_subject(self, subject: str, schema: Schema) -> Optional[int]: - with self.lock: - if schema in self.subject_schemas[subject]: - return self.schema_index.get(schema, None) - return None - def get_registered_schema_by_schema( - self, subject: str, + self, + subject_name: str, schema: Schema ) -> Optional[RegisteredSchema]: with self.lock: - for rs in self.subject_schemas[subject]: - if rs.schema == schema: - return rs + if subject_name in self.subject_schemas: + for rs in self.subject_schemas[subject_name]: + if rs.schema == schema: + return rs return None def get_version(self, subject_name: str, version: int) -> Optional[RegisteredSchema]: @@ -119,15 +115,18 @@ def remove_by_schema(self, registered_schema: RegisteredSchema): if subject_name in self.subject_schemas: self.subject_schemas[subject_name].remove(registered_schema) - def remove_by_subject(self, subject_name: str): + def remove_by_subject(self, subject_name: str) -> List[int]: with self.lock: + versions = [] if subject_name in self.subject_schemas: for rs in self.subject_schemas[subject_name]: + versions.append(rs.version) schema_id = self.schema_index.pop(rs.schema, None) if schema_id is not None: self.schema_id_index.pop(schema_id, None) del self.subject_schemas[subject_name] + return versions def clear(self): with self.lock: @@ -153,14 +152,8 @@ def register_schema_full_response( self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False ) -> 'RegisteredSchema': - schema_id = self._store.get_schema_id_by_subject(subject_name, schema) - if schema_id is not None: - registered_schema = RegisteredSchema( - schema_id=schema_id, - schema=schema, - subject=subject_name, - version=None - ) + registered_schema = self._store.get_registered_schema_by_schema(subject_name, schema) + if registered_schema is not None: return registered_schema latest_schema = self._store.get_latest_version(subject_name) @@ -198,11 +191,11 @@ def lookup_schema( raise SchemaRegistryError(404, 40400, "Schema Not Found") - def get_subjects(self): + def get_subjects(self) -> List[str]: return self._store.get_subjects() - def delete_subject(self, subject_name: str, permanent: bool = False): - self._store.remove_by_subject(subject_name) + def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]: + return self._store.remove_by_subject(subject_name) def get_latest_version(self, subject_name: str, fmt: str = None) -> 'RegisteredSchema': registered_schema = self._store.get_latest_version(subject_name) @@ -234,7 +227,7 @@ def get_version( def get_versions(self, subject_name: str) -> List[int]: return self._store.get_versions(subject_name) - def delete_version(self, subject_name: str, version: int) -> int: + def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int: registered_schema = self._store.get_version(subject_name, version) if registered_schema is not None: self._store.remove_by_schema(registered_schema) diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index 1fb719d2e..1e51b52b0 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -770,7 +770,7 @@ def lookup_schema( return registered_schema - def get_subjects(self): + def get_subjects(self) -> List[str]: """ List all subjects registered with the Schema Registry @@ -786,7 +786,7 @@ def get_subjects(self): return self._rest_client.get('subjects') - def delete_subject(self, subject_name: str, permanent: bool = False): + def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]: """ Deletes the specified subject and its associated compatibility level if registered. It is recommended to use this API only when a topic needs @@ -806,14 +806,13 @@ def delete_subject(self, subject_name: str, permanent: bool = False): `DELETE Subject API Reference `_ """ # noqa: E501 - versions = self._rest_client.delete('subjects/{}' - .format(_urlencode(subject_name))) - if permanent: - self._rest_client.delete('subjects/{}?permanent=true' - .format(_urlencode(subject_name))) - - self._cache.remove_by_subject(subject_name) + versions = self._rest_client.delete('subjects/{}?permanent=true' + .format(_urlencode(subject_name))) + self._cache.remove_by_subject(subject_name) + else: + versions = self._rest_client.delete('subjects/{}' + .format(_urlencode(subject_name))) return versions @@ -949,7 +948,7 @@ def get_versions(self, subject_name: str) -> List[int]: return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name))) - def delete_version(self, subject_name: str, version: int) -> int: + def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int: """ Deletes a specific version registered to ``subject_name``. @@ -958,6 +957,8 @@ def delete_version(self, subject_name: str, version: int) -> int: version (int): Version number + permanent (bool): True for a hard delete, False (default) for a soft delete + Returns: int: Version number which was deleted @@ -968,11 +969,15 @@ def delete_version(self, subject_name: str, version: int) -> int: `Delete Subject Version API Reference `_ """ # noqa: E501 - response = self._rest_client.delete('subjects/{}/versions/{}'. - format(_urlencode(subject_name), - version)) - - self._cache.remove_by_subject_version(subject_name, version) + if permanent: + response = self._rest_client.delete('subjects/{}/versions/{}?permanent=true' + .format(_urlencode(subject_name), + version)) + self._cache.remove_by_subject_version(subject_name, version) + else: + response = self._rest_client.delete('subjects/{}/versions/{}' + .format(_urlencode(subject_name), + version)) return response diff --git a/src/confluent_kafka/schema_registry/serde.py b/src/confluent_kafka/schema_registry/serde.py index 5462dc924..e1bd16206 100644 --- a/src/confluent_kafka/schema_registry/serde.py +++ b/src/confluent_kafka/schema_registry/serde.py @@ -291,15 +291,15 @@ def _execute_rules( return message rules: Optional[List[Rule]] = None if rule_mode == RuleMode.UPGRADE: - if target.rule_set is not None: + if target is not None and target.rule_set is not None: rules = target.rule_set.migration_rules elif rule_mode == RuleMode.DOWNGRADE: - if source.rule_set is not None: + if source is not None and source.rule_set is not None: rules = source.rule_set.migration_rules rules = rules[:] if rules else [] rules.reverse() else: - if target.rule_set is not None: + if target is not None and target.rule_set is not None: rules = target.rule_set.domain_rules if rule_mode == RuleMode.READ: # Execute read rules in reverse order for symmetry