Skip to content

Commit 62b9036

Browse files
new sr endpoint client addition (#3899)
1 parent d6feacb commit 62b9036

File tree

9 files changed

+120
-27
lines changed

9 files changed

+120
-27
lines changed

client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.cache.CacheLoader;
2323
import com.google.common.cache.LoadingCache;
2424
import com.google.common.collect.ImmutableMap;
25+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryServerVersion;
2526
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
2627
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
2728
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
@@ -990,6 +991,12 @@ public SchemaRegistryDeployment getSchemaRegistryDeployment()
990991
return restService.getSchemaRegistryDeployment();
991992
}
992993

994+
@Override
995+
public SchemaRegistryServerVersion getSchemaRegistryServerVersion()
996+
throws IOException, RestClientException {
997+
return restService.getSchemaRegistryServerVersion();
998+
}
999+
9931000
@Override
9941001
public Collection<String> getAllSubjects() throws IOException, RestClientException {
9951002
return restService.getAllSubjects();

client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
2424
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
2525
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryDeployment;
26+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryServerVersion;
2627
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
2728
import java.io.Closeable;
2829
import java.io.IOException;
@@ -295,6 +296,11 @@ default SchemaRegistryDeployment getSchemaRegistryDeployment()
295296
throw new UnsupportedOperationException();
296297
}
297298

299+
default SchemaRegistryServerVersion getSchemaRegistryServerVersion()
300+
throws IOException, RestClientException {
301+
throw new UnsupportedOperationException();
302+
}
303+
298304
public Collection<String> getAllSubjects() throws IOException, RestClientException;
299305

300306
default Collection<String> getAllSubjects(boolean lookupDeletedSubject) throws IOException,

client/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
2323
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
2424
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
25+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryDeployment;
26+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryServerVersion;
27+
import java.util.ArrayList;
2528
import org.junit.Before;
2629
import org.junit.Test;
2730

@@ -888,6 +891,44 @@ public void testMissingSchemaCache() throws Exception {
888891
client.getId(SUBJECT_0, AVRO_SCHEMA_0);
889892
}
890893

894+
@Test
895+
public void testGetSchemaRegistryDeployment() throws Exception {
896+
List<String> deploymentAttributes = new ArrayList<>(Collections.singleton("deploymentScope:opensource"));
897+
SchemaRegistryDeployment expectedDeployment = new SchemaRegistryDeployment(deploymentAttributes);
898+
899+
expect(restService.getSchemaRegistryDeployment())
900+
.andReturn(expectedDeployment);
901+
902+
replay(restService);
903+
904+
SchemaRegistryDeployment deployment = client.getSchemaRegistryDeployment();
905+
906+
assertNotNull(deployment);
907+
assertEquals(expectedDeployment.getAttributes(), deployment.getAttributes());
908+
909+
verify(restService);
910+
}
911+
912+
@Test
913+
public void testGetSchemaRegistryServerVersion() throws Exception {
914+
String version = "7.5.0";
915+
String commitId = "abc123def456";
916+
SchemaRegistryServerVersion expectedVersion = new SchemaRegistryServerVersion(version, commitId);
917+
918+
expect(restService.getSchemaRegistryServerVersion())
919+
.andReturn(expectedVersion);
920+
921+
replay(restService);
922+
923+
SchemaRegistryServerVersion serverVersion = client.getSchemaRegistryServerVersion();
924+
925+
assertNotNull(serverVersion);
926+
assertEquals(version, serverVersion.getVersion());
927+
assertEquals(commitId, serverVersion.getCommitId());
928+
929+
verify(restService);
930+
}
931+
891932

892933
private static AvroSchema avroSchema(final int i) {
893934
return new AvroSchema(avroSchemaString(i));

core/src/main/java/io/confluent/kafka/schemaregistry/rest/client/LocalSchemaRegistryClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
3030
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
3131
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryDeployment;
32+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryServerVersion;
3233
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
3334
import io.confluent.kafka.schemaregistry.client.rest.entities.ExtendedSchema;
3435
import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion;
@@ -52,6 +53,7 @@
5253
import io.confluent.kafka.schemaregistry.rest.VersionId;
5354
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
5455
import io.confluent.kafka.schemaregistry.rest.exceptions.RestInvalidCompatibilityException;
56+
import io.confluent.kafka.schemaregistry.utils.AppInfoParser;
5557
import io.confluent.kafka.schemaregistry.utils.Props;
5658
import io.confluent.kafka.schemaregistry.rest.exceptions.RestInvalidModeException;
5759
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
@@ -692,6 +694,12 @@ public SchemaRegistryDeployment getSchemaRegistryDeployment()
692694
return Props.getSchemaRegistryDeployment(schemaRegistry.properties());
693695
}
694696

697+
@Override
698+
public SchemaRegistryServerVersion getSchemaRegistryServerVersion()
699+
throws IOException, RestClientException {
700+
return new SchemaRegistryServerVersion(AppInfoParser.getVersion(), AppInfoParser.getCommitId());
701+
}
702+
695703
@Override
696704
public void reset() {
697705
}

core/src/main/java/io/confluent/kafka/schemaregistry/utils/Props.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public static SchemaRegistryDeployment getSchemaRegistryDeployment(Map<String, O
3636
List<?> srDeploymentList = (List<?>) srDeployment;
3737
// Validate and process each element
3838
List<String> processedList = srDeploymentList.stream().map(
39-
item -> item.toString().trim().toLowerCase()
39+
item -> item.toString().trim()
4040
).collect(Collectors.toList());
4141
return new SchemaRegistryDeployment(processedList);
4242
} else {

core/src/test/java/io/confluent/kafka/schemaregistry/rest/client/LocalSchemaRegistryClientTest.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,37 @@
1616
package io.confluent.kafka.schemaregistry.rest.client;
1717

1818

19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.junit.jupiter.api.Assertions.assertThrows;
23+
1924
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
2025
import io.confluent.kafka.schemaregistry.ParsedSchema;
2126
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
2227
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
2328
import io.confluent.kafka.schemaregistry.client.rest.entities.Config;
2429
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
2530
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
26-
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
31+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryDeployment;
32+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryServerVersion;
2733
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
2834
import io.confluent.kafka.schemaregistry.rest.exceptions.RestInvalidSchemaException;
2935
import io.confluent.kafka.schemaregistry.rest.exceptions.RestOperationNotPermittedException;
3036
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
3137
import io.confluent.kafka.schemaregistry.storage.StoreUtils;
3238
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer;
33-
34-
import java.io.IOException;
35-
import java.util.*;
36-
39+
import io.confluent.kafka.schemaregistry.utils.Props;
3740
import io.confluent.rest.exceptions.RestNotFoundException;
41+
import java.util.ArrayList;
42+
import java.util.Collections;
43+
import java.util.HashMap;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.Optional;
47+
import java.util.Properties;
3848
import org.junit.Before;
3949
import org.junit.jupiter.api.Test;
40-
import static org.junit.jupiter.api.Assertions.assertThrows;
41-
42-
import static org.junit.Assert.*;
43-
import static org.junit.Assert.assertEquals;
4450

4551
public class LocalSchemaRegistryClientTest extends ClusterTestHarness {
4652

@@ -214,4 +220,27 @@ public void testGetByVersion() throws Exception {
214220
Schema s2 = client.getByVersion(SUBJECT2, 1, false);
215221
assertEquals(id2, s2.getId().intValue());
216222
}
223+
224+
@Test
225+
public void testGetSchemaRegistryDeployment() throws Exception {
226+
Map<String, Object> props = new HashMap<>();
227+
List<String> deploymentAttributes = new ArrayList<String>(Collections.singleton("deploymentScope:opensource"));
228+
props.put(Props.PROPERTY_SCHEMA_REGISTRY_DEPLOYMENT_ATTRIBUTES, deploymentAttributes);
229+
230+
SchemaRegistryDeployment deployment = client.getSchemaRegistryDeployment();
231+
232+
assertNotNull(deployment);
233+
assertEquals(deployment.getAttributes(),
234+
new ArrayList<String>()
235+
);
236+
}
237+
238+
@Test
239+
public void testGetSchemaRegistryServerVersion() throws Exception {
240+
SchemaRegistryServerVersion version = client.getSchemaRegistryServerVersion();
241+
242+
assertNotNull(version);
243+
assertNotNull(version.getVersion());
244+
assertNotNull(version.getCommitId());
245+
}
217246
}

core/src/test/java/io/confluent/kafka/schemaregistry/utils/PropsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public void testGetSchemaRegistryDeploymentWithMixedCaseStrings() {
182182

183183
SchemaRegistryDeployment result = Props.getSchemaRegistryDeployment(props);
184184
assertNotNull("Should return SchemaRegistryDeployment for mixed case strings", result);
185-
List<String> expected = Arrays.asList("confluent", "enterprise", "opensource");
185+
List<String> expected = Arrays.asList("CONFLUENT", "Enterprise", "opensource");
186186
assertEquals("Should convert to lowercase", expected, result.getAttributes());
187187
}
188188

@@ -223,14 +223,14 @@ public void testGetSchemaRegistryDeploymentWithComplexObjects() {
223223
}
224224

225225
@Test
226-
public void testGetSchemaRegistryDeploymentTrimsAndLowercase() {
226+
public void testGetSchemaRegistryDeploymentTrims() {
227227
Map<String, Object> props = new HashMap<>();
228228
List<String> attributes = Arrays.asList(" CONFLUENT ", "Enterprise ", " opensource");
229229
props.put(Props.PROPERTY_SCHEMA_REGISTRY_DEPLOYMENT_ATTRIBUTES, attributes);
230230

231231
SchemaRegistryDeployment result = Props.getSchemaRegistryDeployment(props);
232232
assertNotNull("Should return SchemaRegistryDeployment", result);
233-
List<String> expected = Arrays.asList("confluent", "enterprise", "opensource");
233+
List<String> expected = Arrays.asList("CONFLUENT", "Enterprise", "opensource");
234234
assertEquals("Should trim whitespace and convert to lowercase", expected, result.getAttributes());
235235
}
236236
}

core/src/test/java/io/confluent/kafka/serializers/protobuf/test/Ref.java

Lines changed: 6 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/test/java/io/confluent/kafka/serializers/protobuf/test/Root.java

Lines changed: 10 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)