diff --git a/README.md b/README.md index 54bd25bc..6b54bd3c 100644 --- a/README.md +++ b/README.md @@ -239,6 +239,11 @@ If the Spring Boot application needs to connect to SQS queues across multiple AW which will be able to obtain a specific `SqsAsyncClient` based on an identifier. For more information on how to do this, take a look at the documentation at [How To Connect to Multiple AWS Accounts](doc/how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md) +### Versioning Message Payloads using Apache Avro Schemas +As the application grows, it may be beneficial to allow for versioning of the schema so that the consumer can still serialize messages from producers sending +different versions of the schema. To allow for this the [spring-cloud-schema-registry-extension](extensions/spring-cloud-schema-registry-extension) was written +to support this functionality. See the [README.md](extensions/spring-cloud-schema-registry-extension/README.md) for this extension for more details. + ### Comparing Libraries If you want to see the difference between this library and others like the [Spring Cloud AWS Messaging](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-messaging) and diff --git a/configuration/spotbugs/bugsExcludeFilter.xml b/configuration/spotbugs/bugsExcludeFilter.xml index fc5a16cb..20a95dc3 100644 --- a/configuration/spotbugs/bugsExcludeFilter.xml +++ b/configuration/spotbugs/bugsExcludeFilter.xml @@ -4,4 +4,7 @@ + + + \ No newline at end of file diff --git a/doc/documentation.md b/doc/documentation.md index f5b14ca1..271305dd 100644 --- a/doc/documentation.md +++ b/doc/documentation.md @@ -32,6 +32,8 @@ more in depth understanding take a look at the JavaDoc for the [java-dynamic-sqs processing of messages for specific queue listeners 1. [How to connect to multiple AWS Accounts](how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md): guide for listening to queues across multiple AWS Accounts + 1. [How to version message payload schemas](how-to-guides/spring/spring-how-to-version-payload-schemas-using-spring-cloud-schema-registry.md): guide + for versioning payloads using Avro and the Spring Cloud Schema Registry. 1. Local Development: 1. [Setting up IntelliJ](local-development/setting-up-intellij.md): steps for setting IntelliJ up for development, e.g. configuring checkstyle, Lombok, etc diff --git a/doc/how-to-guides/spring/spring-how-to-version-payload-schemas-using-spring-cloud-schema-registry.md b/doc/how-to-guides/spring/spring-how-to-version-payload-schemas-using-spring-cloud-schema-registry.md new file mode 100644 index 00000000..9b25a99f --- /dev/null +++ b/doc/how-to-guides/spring/spring-how-to-version-payload-schemas-using-spring-cloud-schema-registry.md @@ -0,0 +1,70 @@ +# Spring - How to version message payload Schemas using Spring Cloud Schema Registry +As your application grows over time the format of the data that needs to be sent in the SQS messages may change as well. To allow for +these changes, the [Spring Cloud Schema Registry](https://cloud.spring.io/spring-cloud-static/spring-cloud-schema-registry/1.0.0.RC1/reference/html/spring-cloud-schema-registry.html) +can be used to track the version of your schemas, allowing the SQS consumer to be able to interpret multiple versions of your payload. + +## Full reference +For a full working solution of this feature, take a look at the [Spring Cloud Schema Registry Example](../../../examples/spring-cloud-schema-registry-example). + +## Steps to consume messages serialized using Apache Avro +1. Include the `Spring Cloud Schema Registry Extension` dependency + ```xml + + com.jashmore + avro-spring-cloud-schema-registry-extension + ${project.version} + + ``` +1. Define your schemas and map this in your spring `application.yml` + ```yml + spring: + cloud: + schema-registry-client: + endpoint: http://localhost:8990 + schema: + avro: + schema-imports: + - classpath:avro/author.avsc + schema-locations: + - classpath:avro/book.avsc + ``` + In this example above we have a book schema which is dependent on the author schema. We have also hardcoded the Schema Registry + to be at [http://localhost:8990](http://localhost:8990). +1. Create your schemas and place them in your `resources` directory. For example this is an example schema for the Book. + ```json + { + "namespace" : "com.jashmore.sqs.extensions.registry.model", + "type" : "record", + "name" : "Book", + "fields" : [ + { "name":"id","type":"string" }, + { "name":"name","type":"string" }, + { "name":"author","type":"Author" } + ] + } + ``` +1. Enable the extension by annotating the Spring Application + ```java + @EnableSchemaRegistrySqsExtension + @SpringBootApplication + class Application { + // normal code + } + ``` +1. Define your queue listener using the `@SpringCloudSchemaRegistryPayload` to represent the payload that needs to be deserialized from +the message payload. + ```java + @QueueListener(value = "queueName") + public void listen(@SpringCloudSchemaRegistryPayload Book payload) { + log.info("Payload: {}", payload); + } + ``` + +## Steps to produce messages using Avro +You can wrap your `SqsAsyncClient` with the +[AvroSchemaRegistrySqsAsyncClient](../../../util/proxy-method-interceptor/src/main/java/com/jashmore/sqs/registry/AvroSchemaRegistrySqsAsyncClient.java) +to be able to more easily send a message that will be serialized using the Avro Schema. This Avro SQS Client was built for testing purposes and therefore it is +recommended to developer your own logic for sending these messages. + +For a full example of building this client, take a look at the +[Producer Example](../../../examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer). diff --git a/examples/pom.xml b/examples/pom.xml index 54a5800a..71f945a4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,6 +21,7 @@ core-examples spring-aws-example + spring-cloud-schema-registry-example spring-integration-test-example spring-multiple-aws-account-example spring-starter-examples diff --git a/examples/spring-cloud-schema-registry-example/README.md b/examples/spring-cloud-schema-registry-example/README.md new file mode 100644 index 00000000..524389c7 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/README.md @@ -0,0 +1,36 @@ +# Spring Cloud Schema Registry Extension Example +This example shows how you can consume messages which have been defined using an [Avro](https://avro.apache.org/docs/1.9.2/gettingstartedjava.html) +Schema and the [Spring Cloud Schema Registry](https://cloud.spring.io/spring-cloud-static/spring-cloud-schema-registry/1.0.0.RC1/reference/html/spring-cloud-schema-registry.html). + +To find the corresponding code look in the [Spring Cloud Schema Registry Extension](../../extensions/spring-cloud-schema-registry-extension) module. + +## Steps +Start each of these applications in new terminals/your IDE: +1. A Spring Cloud Schema Registry server + ```bash + wget -O /tmp/schema-registry-server.jar https://repo.spring.io/libs-release-ossrh-cache/org/springframework/cloud/spring-cloud-schema-registry-server/1.0.3.RELEASE/spring-cloud-schema-registry-server-1.0.3.RELEASE.jar + cd /tmp + java -jar schema-registry-server.jar + ``` +1. A local SQS server using ElasticMQ + ```bash + docker run -p 9324:9324 softwaremill/elasticmq + ``` +1. The SQS consumer service + ```bash + cd java-dynamic-sqs-listener/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer + mvn spring-boot:run + ``` +1. The first SQS producer service + ```bash + cd java-dynamic-sqs-listener/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer + mvn spring-boot:run + ``` +1. The second SQS producer service + ```bash + cd java-dynamic-sqs-listener/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-2 + mvn spring-boot:run + ``` + +You should now see the consumer receiving messages from both producers and even though the producers are sending +the payload in different schema versions the consumer is still able to process the message. \ No newline at end of file diff --git a/examples/spring-cloud-schema-registry-example/pom.xml b/examples/spring-cloud-schema-registry-example/pom.xml new file mode 100644 index 00000000..7bea6982 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/pom.xml @@ -0,0 +1,24 @@ + + + + examples + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + spring-cloud-schema-registry-example + pom + + Java Dynamic SQS Listener - Spring Starter - Spring Cloud Schema Registry Example + Contains examples for serializing messages using Avro and storing these Schema Definitions in the Spring Cloud Schema Registry + + + spring-cloud-schema-registry-consumer + spring-cloud-schema-registry-producer + spring-cloud-schema-registry-producer-two + + + \ No newline at end of file diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/pom.xml b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/pom.xml new file mode 100644 index 00000000..bf7e5e11 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/pom.xml @@ -0,0 +1,110 @@ + + + + spring-cloud-schema-registry-example + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + spring-cloud-schema-registry-consumer + + Java Dynamic SQS Listener - Spring Starter - Spring Cloud Schema Registry Example - Consumer + Contains an example of a consumer deserializing a message payload that is in a schema registered in the Spring Cloud Schema Registry. + + + ../../../configuration/spotbugs/bugsExcludeFilter.xml + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-autoconfigure-processor + + + + com.jashmore + java-dynamic-sqs-listener-spring-starter + ${project.version} + + + + org.projectlombok + lombok + provided + + + + com.jashmore + avro-spring-cloud-schema-registry-extension + ${project.version} + + + + com.jashmore + local-amazon-sqs + ${project.version} + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/avro + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.jashmore.sqs.examples.schemaregistry.ConsumerApplication + + + + + repackage + + + + + + org.apache.avro + avro-maven-plugin + + + generate-sources + + schema + protocol + idl-protocol + + + src/main/resources/avro + + + + + + + \ No newline at end of file diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/java/com/jashmore/sqs/examples/schemaregistry/ConsumerApplication.java b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/java/com/jashmore/sqs/examples/schemaregistry/ConsumerApplication.java new file mode 100644 index 00000000..51462780 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/java/com/jashmore/sqs/examples/schemaregistry/ConsumerApplication.java @@ -0,0 +1,41 @@ +package com.jashmore.sqs.examples.schemaregistry; + +import com.example.Sensor; +import com.jashmore.sqs.extensions.registry.SpringCloudSchemaRegistryPayload; +import com.jashmore.sqs.extensions.registry.avro.EnableSchemaRegistrySqsExtension; +import com.jashmore.sqs.spring.container.basic.QueueListener; +import com.jashmore.sqs.util.LocalSqsAsyncClient; +import com.jashmore.sqs.util.SqsQueuesConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + +@Slf4j +@SpringBootApplication +@EnableSchemaRegistrySqsExtension +@SuppressWarnings("checkstyle:javadocmethod") +public class ConsumerApplication { + + public static void main(String[] args) { + SpringApplication.run(ConsumerApplication.class); + } + + @Bean + public SqsAsyncClient sqsAsyncClient() { + return new LocalSqsAsyncClient(SqsQueuesConfig.builder() + .sqsServerUrl("http://localhost:9324") + .queue(SqsQueuesConfig.QueueConfig.builder() + .queueName("test") + .deadLetterQueueName("test-dlq") + .maxReceiveCount(3) + .build()) + .build()); + } + + @QueueListener(value = "test", identifier = "message-listener") + public void listen(@SpringCloudSchemaRegistryPayload Sensor payload) { + log.info("Payload: {}", payload); + } +} diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/resources/application.yml b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/resources/application.yml new file mode 100644 index 00000000..66107ca2 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/resources/application.yml @@ -0,0 +1,8 @@ +spring: + cloud: + schema-registry-client: + endpoint: http://localhost:8990 + schema: + avro: + schema-locations: + - classpath:avro/sensor.avsc diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/resources/avro/sensor.avsc b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/resources/avro/sensor.avsc new file mode 100644 index 00000000..bdbf1ca2 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/resources/avro/sensor.avsc @@ -0,0 +1,12 @@ + { + "namespace" : "com.example", + "type" : "record", + "name" : "Sensor", + "fields" : [ + {"name":"id","type":"string"}, + {"name":"internalTemperature", "type":"float", "default":0.0, "aliases":["temperature"]}, + {"name":"externalTemperature", "type":"float", "default":0.0}, + {"name":"acceleration", "type":"float","default":0.0}, + {"name":"velocity","type":"float","default":0.0} + ] +} \ No newline at end of file diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/resources/logback.xml b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/resources/logback.xml new file mode 100644 index 00000000..fb1af384 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/pom.xml b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/pom.xml new file mode 100644 index 00000000..8978e3d2 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/pom.xml @@ -0,0 +1,103 @@ + + + + spring-cloud-schema-registry-example + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + spring-cloud-schema-registry-producer-two + + Java Dynamic SQS Listener - Spring Starter - Spring Cloud Schema Registry Example - Producer Two + Includes a second example of a service producing messages whose schema is registered in the Spring Cloud Schema Registry + and is in a different format to the first. + + + ../../../configuration/spotbugs/bugsExcludeFilter.xml + + + + + org.springframework.boot + spring-boot-starter + + + + org.projectlombok + lombok + provided + + + + com.google.guava + guava + + + + com.jashmore + avro-spring-cloud-schema-registry-sqs-client + ${project.version} + + + + software.amazon.awssdk + sqs + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/avro + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.jashmore.sqs.examples.schemaregistry.ProducerTwoApplication + + + + + repackage + + + + + + org.apache.avro + avro-maven-plugin + + + generate-sources-producer-1 + generate-sources + + schema + + + src/main/resources/avro + + + + + + + \ No newline at end of file diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/java/com/jashmore/sqs/examples/schemaregistry/MessageProducers.java b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/java/com/jashmore/sqs/examples/schemaregistry/MessageProducers.java new file mode 100644 index 00000000..c0b6283c --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/java/com/jashmore/sqs/examples/schemaregistry/MessageProducers.java @@ -0,0 +1,40 @@ +package com.jashmore.sqs.examples.schemaregistry; + +import com.google.common.collect.ImmutableList; + +import com.example.Sensor; +import com.jashmore.sqs.registry.AvroSchemaRegistrySqsAsyncClient; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; + +import java.util.UUID; + +@Slf4j +@Component +@EnableScheduling +@RequiredArgsConstructor +@SuppressWarnings("checkstyle:javadocmethod") +public class MessageProducers { + private final AvroSchemaRegistrySqsAsyncClient sqsAsyncClient; + + @Scheduled(initialDelay = 1000, fixedDelay = 1000) + public void addMessages() { + final Sensor payload = new Sensor("V2-" + UUID.randomUUID().toString(), 1.0F, 1.1F, 1.2F, 1.3F, ImmutableList.of(1.4F), ImmutableList.of(1.5F)); + + sqsAsyncClient.getQueueUrl((request) -> request.queueName("test")) + .thenApply(GetQueueUrlResponse::queueUrl) + .thenCompose(queueUrl -> sqsAsyncClient.sendAvroMessage( + "ProducerV2", "contentType", payload, requestBuilder -> requestBuilder.queueUrl(queueUrl) + )) + .whenComplete((result, throwable) -> { + if (throwable != null) { + log.error("Error sending message", throwable); + } + log.info("Published message with id {}", payload.getId()); + }); + } +} diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/java/com/jashmore/sqs/examples/schemaregistry/ProducerTwoApplication.java b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/java/com/jashmore/sqs/examples/schemaregistry/ProducerTwoApplication.java new file mode 100644 index 00000000..a24987b3 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/java/com/jashmore/sqs/examples/schemaregistry/ProducerTwoApplication.java @@ -0,0 +1,58 @@ +package com.jashmore.sqs.examples.schemaregistry; + +import com.google.common.collect.ImmutableList; + +import com.jashmore.sqs.registry.AvroSchemaRegistrySqsAsyncClient; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.schema.registry.avro.AvroMessageConverterProperties; +import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManager; +import org.springframework.cloud.schema.registry.client.EnableSchemaRegistryClient; +import org.springframework.cloud.schema.registry.client.SchemaRegistryClient; +import org.springframework.context.annotation.Bean; +import org.springframework.core.io.Resource; +import org.springframework.scheduling.annotation.EnableScheduling; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + +import java.net.URI; +import java.util.List; +import java.util.Optional; + +@SpringBootApplication +@EnableScheduling +@EnableSchemaRegistryClient +@SuppressWarnings("checkstyle:javadocmethod") +public class ProducerTwoApplication { + + public static void main(String[] args) { + SpringApplication.run(ProducerTwoApplication.class); + } + + @Bean + public SqsAsyncClient sqsAsyncClient() { + return SqsAsyncClient.builder() + .endpointOverride(URI.create("http://localhost:9324")) + .region(Region.AP_NORTHEAST_2) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("X", "X"))) + .build(); + } + + @Bean + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public AvroSchemaRegistrySqsAsyncClient avroSchemaRegistrySqsAsyncClient(final SqsAsyncClient delegate, + final SchemaRegistryClient schemaRegistryClient, + final AvroSchemaServiceManager avroSchemaServiceManager, + final AvroMessageConverterProperties avroMessageConverterProperties) { + final List schemaImports = Optional.ofNullable(avroMessageConverterProperties.getSchemaImports()) + .map(ImmutableList::copyOf) + .orElse(ImmutableList.of()); + final List schemaLocations = Optional.ofNullable(avroMessageConverterProperties.getSchemaLocations()) + .map(ImmutableList::copyOf) + .orElse(ImmutableList.of()); + + return new AvroSchemaRegistrySqsAsyncClient(delegate, schemaRegistryClient, avroSchemaServiceManager, schemaImports, schemaLocations); + } +} diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/resources/application.yml b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/resources/application.yml new file mode 100644 index 00000000..af582763 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/resources/application.yml @@ -0,0 +1,8 @@ +spring: + cloud: + schema-registry-client: + endpoint: http://localhost:8990 + schema: + avro: + schema-locations: + - classpath:avro/sensor.avsc \ No newline at end of file diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/resources/avro/sensor.avsc b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/resources/avro/sensor.avsc new file mode 100644 index 00000000..8d2e6053 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-two/src/main/resources/avro/sensor.avsc @@ -0,0 +1,25 @@ +{ + "namespace" : "com.example", + "type" : "record", + "name" : "Sensor", + "fields" : [ + {"name":"id","type":"string"}, + {"name":"internalTemperature", "type":"float", "default":0.0, "aliases":["temperature"]}, + {"name":"externalTemperature", "type":"float", "default":0.0}, + {"name":"acceleration", "type":"float","default":0.0}, + {"name":"velocity","type":"float","default":0.0}, + {"name":"accelerometer","type":[ + "null",{ + "type":"array", + "items":"float" + } + ]}, + {"name":"magneticField","type":[ + "null",{ + "type":"array", + "items":"float" + } + ]} + ] + +} \ No newline at end of file diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/pom.xml b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/pom.xml new file mode 100644 index 00000000..db438655 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/pom.xml @@ -0,0 +1,102 @@ + + + + spring-cloud-schema-registry-example + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + spring-cloud-schema-registry-producer + + Java Dynamic SQS Listener - Spring Starter - Spring Cloud Schema Registry Example - Producer + Includes an example of a service producing messages whose schema is registered in the Spring Cloud Schema Registry. + + + ../../../configuration/spotbugs/bugsExcludeFilter.xml + + + + + org.springframework.boot + spring-boot-starter + + + + org.projectlombok + lombok + provided + + + + com.google.guava + guava + + + + com.jashmore + avro-spring-cloud-schema-registry-sqs-client + ${project.version} + + + + software.amazon.awssdk + sqs + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/avro + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.jashmore.sqs.examples.schemaregistry.ProducerApplication + + + + + repackage + + + + + + org.apache.avro + avro-maven-plugin + + + generate-sources-producer-1 + generate-sources + + schema + + + src/main/resources/avro + + + + + + + \ No newline at end of file diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/java/com/jashmore/sqs/examples/schemaregistry/MessageProducers.java b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/java/com/jashmore/sqs/examples/schemaregistry/MessageProducers.java new file mode 100644 index 00000000..606a6e6f --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/java/com/jashmore/sqs/examples/schemaregistry/MessageProducers.java @@ -0,0 +1,38 @@ +package com.jashmore.sqs.examples.schemaregistry; + +import com.example.Sensor; +import com.jashmore.sqs.registry.AvroSchemaRegistrySqsAsyncClient; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; + +import java.util.UUID; + +@Slf4j +@Component +@EnableScheduling +@RequiredArgsConstructor +@SuppressWarnings("checkstyle:javadocmethod") +public class MessageProducers { + private final AvroSchemaRegistrySqsAsyncClient sqsAsyncClient; + + @Scheduled(initialDelay = 1000, fixedDelay = 1000) + public void addMessages() { + final Sensor payload = new Sensor("V1-" + UUID.randomUUID().toString(), 1.0F, 1.1F, 1.2F); + + sqsAsyncClient.getQueueUrl((request) -> request.queueName("test")) + .thenApply(GetQueueUrlResponse::queueUrl) + .thenCompose(queueUrl -> sqsAsyncClient.sendAvroMessage( + "ProducerV1", "contentType", payload, requestBuilder -> requestBuilder.queueUrl(queueUrl) + )) + .whenComplete((result, throwable) -> { + if (throwable != null) { + log.error("Error sending message", throwable); + } + log.info("Published message with id {}", payload.getId()); + }); + } +} diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/java/com/jashmore/sqs/examples/schemaregistry/ProducerApplication.java b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/java/com/jashmore/sqs/examples/schemaregistry/ProducerApplication.java new file mode 100644 index 00000000..27a9c936 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/java/com/jashmore/sqs/examples/schemaregistry/ProducerApplication.java @@ -0,0 +1,58 @@ +package com.jashmore.sqs.examples.schemaregistry; + +import com.google.common.collect.ImmutableList; + +import com.jashmore.sqs.registry.AvroSchemaRegistrySqsAsyncClient; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.schema.registry.avro.AvroMessageConverterProperties; +import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManager; +import org.springframework.cloud.schema.registry.client.EnableSchemaRegistryClient; +import org.springframework.cloud.schema.registry.client.SchemaRegistryClient; +import org.springframework.context.annotation.Bean; +import org.springframework.core.io.Resource; +import org.springframework.scheduling.annotation.EnableScheduling; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + +import java.net.URI; +import java.util.List; +import java.util.Optional; + +@SpringBootApplication +@EnableScheduling +@EnableSchemaRegistryClient +@SuppressWarnings("checkstyle:javadocmethod") +public class ProducerApplication { + + public static void main(String[] args) { + SpringApplication.run(ProducerApplication.class); + } + + @Bean + public SqsAsyncClient sqsAsyncClient() { + return SqsAsyncClient.builder() + .endpointOverride(URI.create("http://localhost:9324")) + .region(Region.AP_NORTHEAST_2) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("X", "X"))) + .build(); + } + + @Bean + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public AvroSchemaRegistrySqsAsyncClient avroSchemaRegistrySqsAsyncClient(final SqsAsyncClient delegate, + final SchemaRegistryClient schemaRegistryClient, + final AvroSchemaServiceManager avroSchemaServiceManager, + final AvroMessageConverterProperties avroMessageConverterProperties) { + final List schemaImports = Optional.ofNullable(avroMessageConverterProperties.getSchemaImports()) + .map(ImmutableList::copyOf) + .orElse(ImmutableList.of()); + final List schemaLocations = Optional.ofNullable(avroMessageConverterProperties.getSchemaLocations()) + .map(ImmutableList::copyOf) + .orElse(ImmutableList.of()); + + return new AvroSchemaRegistrySqsAsyncClient(delegate, schemaRegistryClient, avroSchemaServiceManager, schemaImports, schemaLocations); + } +} diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/resources/application.yml b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/resources/application.yml new file mode 100644 index 00000000..66107ca2 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/resources/application.yml @@ -0,0 +1,8 @@ +spring: + cloud: + schema-registry-client: + endpoint: http://localhost:8990 + schema: + avro: + schema-locations: + - classpath:avro/sensor.avsc diff --git a/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/resources/avro/sensor.avsc b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/resources/avro/sensor.avsc new file mode 100644 index 00000000..c0e060d3 --- /dev/null +++ b/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer/src/main/resources/avro/sensor.avsc @@ -0,0 +1,11 @@ +{ + "namespace" : "com.example", + "type" : "record", + "name" : "Sensor", + "fields" : [ + {"name":"id","type":"string"}, + {"name":"temperature", "type":"float", "default":0.0}, + {"name":"acceleration", "type":"float","default":0.0}, + {"name":"velocity","type":"float","default":0.0} + ] +} diff --git a/extensions/pom.xml b/extensions/pom.xml new file mode 100644 index 00000000..85f989a8 --- /dev/null +++ b/extensions/pom.xml @@ -0,0 +1,22 @@ + + + + java-dynamic-sqs-listener-parent + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + extensions + pom + + + Java Dynamic SQS Listener - Extensions + Extra functionality that is built ontop of the library, for example specific Argument Resolvers. + + + spring-cloud-schema-registry-extension + + diff --git a/extensions/spring-cloud-schema-registry-extension/README.md b/extensions/spring-cloud-schema-registry-extension/README.md new file mode 100644 index 00000000..fe5e20c8 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/README.md @@ -0,0 +1,12 @@ +# Spring Cloud Schema Registry Extension +This extension allows the SQS consumer to be able to parse messages that have been serialized using a schema +like [Avro](https://avro.apache.org/docs/1.9.2/gettingstartedjava.html) and these definitions have been stored in the +[Spring Cloud Schema Registry](https://cloud.spring.io/spring-cloud-static/spring-cloud-schema-registry/1.0.0.RC1/reference/html/spring-cloud-schema-registry.html). + +## Why would you want this? +You may want to more easily control how the schema of your messages change during the lifecycle of the application using a tool like the +Spring Cloud Schema Registry. This will allow you to have your producers with different versions of your message schema, but the SQS +consumer can still be able to use this. + +## Examples +To see this in action take a look at the [Spring Cloud Schema Registry Example](../../examples/spring-cloud-schema-registry-example). diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/pom.xml b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/pom.xml new file mode 100644 index 00000000..f9dfd5bb --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/pom.xml @@ -0,0 +1,131 @@ + + + + spring-cloud-schema-registry-extension + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + avro-spring-cloud-schema-registry-extension + + Java Dynamic SQS Listener - Extensions - Spring Cloud Schema Registry Extension - Avro + Apache Avro implementation of the Spring Cloud Schema Registry Extension. + + + ../../../configuration/spotbugs/bugsExcludeFilter.xml + + + + + com.jashmore + java-dynamic-sqs-listener-api + ${project.version} + + + + com.jashmore + spring-cloud-schema-registry-extension-api + ${project.version} + + + + org.apache.avro + avro + + + + org.projectlombok + lombok + provided + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + + com.jashmore + local-sqs-test-utils-junit5 + ${project.version} + test + + + + com.jashmore + avro-spring-cloud-schema-registry-sqs-client + ${project.version} + test + + + + com.jashmore + java-dynamic-sqs-listener-spring-starter + ${project.version} + test + + + + com.jashmore + in-memory-spring-cloud-schema-registry + ${project.version} + test + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + generate-test-sources + + add-test-source + + + + ${project.build.directory}/generated-test-sources + + + + + + + + org.apache.avro + avro-maven-plugin + + + generate-test-sources + + schema + + + ${project.basedir}/src/test/resources/avro-test-schemas/schema + + ${project.basedir}/src/test/resources/avro-test-schemas/import/author.avsc + + + *.avsc + + ${project.build.directory}/generated-test-sources + String + + + + + + + \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroClasspathConsumerSchemaRetriever.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroClasspathConsumerSchemaRetriever.java new file mode 100644 index 00000000..ca210be3 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroClasspathConsumerSchemaRetriever.java @@ -0,0 +1,59 @@ +package com.jashmore.sqs.extensions.registry.avro; + +import static java.util.stream.Collectors.toMap; + +import com.jashmore.sqs.extensions.registry.ConsumerSchemaRetriever; +import com.jashmore.sqs.extensions.registry.ConsumerSchemaRetrieverException; +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; +import org.springframework.core.io.Resource; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * Implementation that loads all of the schema definitions from resources in the classpath. + * + *

For example it will load all of the schemas in a folder like

resources/avro/{name}.avsc
. + */ +public class AvroClasspathConsumerSchemaRetriever implements ConsumerSchemaRetriever { + private final Map, Schema> classSchemaMap; + + public AvroClasspathConsumerSchemaRetriever(final List schemaImports, + final List schemaLocations) { + final Schema.Parser parser = new Schema.Parser(); + classSchemaMap = Stream.of(schemaImports, schemaLocations) + .filter(Objects::nonNull) + .flatMap(List::stream) + .distinct() + .map(resource -> { + try { + return parser.parse(resource.getInputStream()); + } catch (SchemaParseException | IOException exception) { + throw new AvroSchemaProcessingException("Error processing schema definition: " + resource.getFilename(), exception); + } + }) + .collect(toMap(this::getClassForSchema, Function.identity())); + } + + + @Override + public Schema getSchema(final Class clazz) { + return Optional.ofNullable(classSchemaMap.get(clazz)) + .orElseThrow(() -> new ConsumerSchemaRetrieverException("Could not schema for class: " + clazz.getName())); + } + + private Class getClassForSchema(final Schema schema) { + final String schemaClassName = schema.getNamespace() + "." + schema.getName(); + try { + return Class.forName(schemaClassName); + } catch (ClassNotFoundException classNotFoundException) { + throw new AvroSchemaProcessingException("Could not find class for schema: " + schemaClassName, classNotFoundException); + } + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroMessagePayloadDeserializer.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroMessagePayloadDeserializer.java new file mode 100644 index 00000000..fadb1620 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroMessagePayloadDeserializer.java @@ -0,0 +1,39 @@ +package com.jashmore.sqs.extensions.registry.avro; + +import com.jashmore.sqs.extensions.registry.MessagePayloadDeserializer; +import com.jashmore.sqs.extensions.registry.MessagePayloadDeserializerException; +import org.apache.avro.Schema; +import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManager; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.io.IOException; +import java.util.Base64; +import java.util.function.Function; + +/** + * Deserializes the message into the pojo defined by the consumer Avro schema by transforming it from the serialized representation by + * the producer schema. + */ +public class AvroMessagePayloadDeserializer implements MessagePayloadDeserializer { + private final AvroSchemaServiceManager avroSchemaServiceManager; + private final Function payloadExtractor; + + public AvroMessagePayloadDeserializer(final AvroSchemaServiceManager avroSchemaServiceManager) { + this(avroSchemaServiceManager, (message) -> Base64.getDecoder().decode(message.body())); + } + + public AvroMessagePayloadDeserializer(final AvroSchemaServiceManager avroSchemaServiceManager, + final Function payloadExtractor) { + this.avroSchemaServiceManager = avroSchemaServiceManager; + this.payloadExtractor = payloadExtractor; + } + + @Override + public Object deserialize(final Message message, final Schema producerSchema, final Schema consumerSchema, final Class clazz) { + try { + return avroSchemaServiceManager.readData(clazz, payloadExtractor.apply(message), consumerSchema, producerSchema); + } catch (IOException ioException) { + throw new MessagePayloadDeserializerException(ioException); + } + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSchemaProcessingException.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSchemaProcessingException.java new file mode 100644 index 00000000..b71c389d --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSchemaProcessingException.java @@ -0,0 +1,10 @@ +package com.jashmore.sqs.extensions.registry.avro; + +/** + * Exception thrown when there was a problem processing the Avro schema. + */ +public class AvroSchemaProcessingException extends RuntimeException { + public AvroSchemaProcessingException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSchemaRegistryProducerSchemaRetriever.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSchemaRegistryProducerSchemaRetriever.java new file mode 100644 index 00000000..63862c6f --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSchemaRegistryProducerSchemaRetriever.java @@ -0,0 +1,29 @@ +package com.jashmore.sqs.extensions.registry.avro; + +import com.jashmore.sqs.extensions.registry.ProducerSchemaRetriever; +import com.jashmore.sqs.extensions.registry.ProducerSchemaRetrieverException; +import org.apache.avro.Schema; +import org.springframework.cloud.schema.registry.SchemaReference; +import org.springframework.cloud.schema.registry.client.SchemaRegistryClient; + +/** + * Implementation that uses the Spring Cloud Schema Registry to retrieve the schemas that the producer has sent + * messages using. + */ +public class AvroSchemaRegistryProducerSchemaRetriever implements ProducerSchemaRetriever { + private final SchemaRegistryClient schemaRegistryClient; + + public AvroSchemaRegistryProducerSchemaRetriever(final SchemaRegistryClient schemaRegistryClient) { + this.schemaRegistryClient = schemaRegistryClient; + } + + @Override + public Schema getSchema(final SchemaReference schemaReference) { + try { + final String schemaContent = schemaRegistryClient.fetch(schemaReference); + return new Schema.Parser().parse(schemaContent); + } catch (RuntimeException runtimeException) { + throw new ProducerSchemaRetrieverException(runtimeException); + } + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSpringCloudSchemaProperties.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSpringCloudSchemaProperties.java new file mode 100644 index 00000000..2f4cbb2f --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSpringCloudSchemaProperties.java @@ -0,0 +1,32 @@ +package com.jashmore.sqs.extensions.registry.avro; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.core.io.Resource; + +import java.util.List; +import javax.annotation.Nullable; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ConfigurationProperties("spring.cloud.schema.avro") +public class AvroSpringCloudSchemaProperties { + /** + * The list of schema resources that should be loaded before the {@link #schemaLocations} for the + * scenario that these are reliant on other schemas. + */ + @Nullable + private List schemaImports; + /** + * The locations of the schemas to use for this service. + * + *

These schemas can be dependent on other schema's defined in the {@link #schemaImports} property. + */ + @Nullable + private List schemaLocations; +} diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSqsSpringCloudSchemaRegistryConfiguration.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSqsSpringCloudSchemaRegistryConfiguration.java new file mode 100644 index 00000000..aa8e591c --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/AvroSqsSpringCloudSchemaRegistryConfiguration.java @@ -0,0 +1,61 @@ +package com.jashmore.sqs.extensions.registry.avro; + +import com.jashmore.sqs.extensions.registry.ConsumerSchemaRetriever; +import com.jashmore.sqs.extensions.registry.InMemoryCachingProducerSchemaRetriever; +import com.jashmore.sqs.extensions.registry.MessagePayloadDeserializer; +import com.jashmore.sqs.extensions.registry.ProducerSchemaRetriever; +import com.jashmore.sqs.extensions.registry.SchemaReferenceExtractor; +import com.jashmore.sqs.extensions.registry.SpringCloudSchemaArgumentResolver; +import com.jashmore.sqs.extensions.registry.SpringCloudSchemaRegistryPayload; +import com.jashmore.sqs.extensions.registry.SpringCloudSchemaSqsConfiguration; +import org.apache.avro.Schema; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManager; +import org.springframework.cloud.schema.registry.client.SchemaRegistryClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +@Configuration +@ConditionalOnProperty(value = "spring.cloud.schema.avro.sqs.enabled", matchIfMissing = true) +@EnableConfigurationProperties(AvroSpringCloudSchemaProperties.class) +@Import(SpringCloudSchemaSqsConfiguration.class) +@SuppressWarnings("checkstyle:javadocmethod") +public class AvroSqsSpringCloudSchemaRegistryConfiguration { + + @Bean + @ConditionalOnMissingBean + public ConsumerSchemaRetriever consumerSchemaProvider(final AvroSpringCloudSchemaProperties avroSpringCloudSchemaProperties) { + return new AvroClasspathConsumerSchemaRetriever( + avroSpringCloudSchemaProperties.getSchemaImports(), + avroSpringCloudSchemaProperties.getSchemaLocations() + ); + } + + @Bean + @ConditionalOnMissingBean + public ProducerSchemaRetriever producerSchemaProvider(final SchemaRegistryClient schemaRegistryClient) { + return new InMemoryCachingProducerSchemaRetriever<>( + new AvroSchemaRegistryProducerSchemaRetriever(schemaRegistryClient) + ); + } + + @Bean + @ConditionalOnMissingBean + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public MessagePayloadDeserializer schemaMessageTransformer(final AvroSchemaServiceManager avroSchemaServiceManager) { + return new AvroMessagePayloadDeserializer(avroSchemaServiceManager); + } + + @Bean + @ConditionalOnMissingBean + public SpringCloudSchemaArgumentResolver springCloudSchemaArgumentResolver(final SchemaReferenceExtractor schemaReferenceExtractor, + final ConsumerSchemaRetriever consumerSchemaRetriever, + final ProducerSchemaRetriever producerSchemaProvider, + final MessagePayloadDeserializer messagePayloadDeserializer) { + return new SpringCloudSchemaArgumentResolver<>(schemaReferenceExtractor, consumerSchemaRetriever, producerSchemaProvider, messagePayloadDeserializer, + SpringCloudSchemaRegistryPayload.class); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/EnableSchemaRegistrySqsExtension.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/EnableSchemaRegistrySqsExtension.java new file mode 100644 index 00000000..5d11ce78 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/main/java/com/jashmore/sqs/extensions/registry/avro/EnableSchemaRegistrySqsExtension.java @@ -0,0 +1,17 @@ +package com.jashmore.sqs.extensions.registry.avro; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import com.jashmore.sqs.extensions.registry.SpringCloudSchemaSqsConfiguration; +import com.jashmore.sqs.extensions.registry.avro.AvroSqsSpringCloudSchemaRegistryConfiguration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +@Retention(value = RUNTIME) +@Target(ElementType.TYPE) +@Import({ SpringCloudSchemaSqsConfiguration.class, AvroSqsSpringCloudSchemaRegistryConfiguration.class }) +public @interface EnableSchemaRegistrySqsExtension { +} diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/com/jashmore/sqs/extensions/registry/avro/AvroClasspathConsumerSchemaRetrieverTest.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/com/jashmore/sqs/extensions/registry/avro/AvroClasspathConsumerSchemaRetrieverTest.java new file mode 100644 index 00000000..f1781dc0 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/com/jashmore/sqs/extensions/registry/avro/AvroClasspathConsumerSchemaRetrieverTest.java @@ -0,0 +1,94 @@ +package com.jashmore.sqs.extensions.registry.avro; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.google.common.collect.ImmutableList; + +import com.jashmore.sqs.extensions.registry.ConsumerSchemaRetrieverException; +import com.jashmore.sqs.extensions.registry.model.Author; +import com.jashmore.sqs.extensions.registry.model.Book; +import org.apache.avro.Schema; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; + +class AvroClasspathConsumerSchemaRetrieverTest { + @Nested + class SchemaParsing { + @Test + void creatingSchemaForResourceThatDoesNotExistThrowsException() { + final AvroSchemaProcessingException exception = assertThrows(AvroSchemaProcessingException.class, () -> new AvroClasspathConsumerSchemaRetriever( + ImmutableList.of(new ClassPathResource("unknown/schema.avsc")), + ImmutableList.of() + )); + + assertThat(exception).hasMessage("Error processing schema definition: schema.avsc"); + } + + @Test + void schemaThatHasNotHadTheJavaFileGeneratedWillReturnError() { + final AvroSchemaProcessingException exception = assertThrows(AvroSchemaProcessingException.class, () -> new AvroClasspathConsumerSchemaRetriever( + ImmutableList.of(new ClassPathResource("avro-non-generated-test-schemas/non-built-schema.avsc")), + ImmutableList.of() + )); + + assertThat(exception).hasMessage("Could not find class for schema: com.jashmore.sqs.extensions.registry.model.NonBuiltSchema"); + } + + @Test + void duplicateSchemaDefinitionsAreIgnored() { + new AvroClasspathConsumerSchemaRetriever( + ImmutableList.of( + new ClassPathResource("avro-test-schemas/import/author.avsc"), new ClassPathResource("avro-test-schemas/import/author.avsc") + ), + ImmutableList.of( + new ClassPathResource("avro-test-schemas/schema/book.avsc"), new ClassPathResource("avro-test-schemas/schema/book.avsc") + ) + ); + } + + @Test + void missingDependentSchemasWillThrowExceptionInParsing() { + final AvroSchemaProcessingException exception = assertThrows(AvroSchemaProcessingException.class,() -> new AvroClasspathConsumerSchemaRetriever( + ImmutableList.of(), + ImmutableList.of(new ClassPathResource("avro-test-schemas/schema/book.avsc")) + )); + + assertThat(exception).hasMessage("Error processing schema definition: book.avsc"); + } + } + + @Nested + class GetSchema { + private AvroClasspathConsumerSchemaRetriever avroClasspathConsumerSchemaRetriever; + + @BeforeEach + void setUp() { + avroClasspathConsumerSchemaRetriever = new AvroClasspathConsumerSchemaRetriever( + ImmutableList.of(new ClassPathResource("avro-test-schemas/import/author.avsc")), + ImmutableList.of(new ClassPathResource("avro-test-schemas/schema/book.avsc")) + ); + } + + @Test + void canObtainSchemaForLeafSchema() { + final Schema schema = avroClasspathConsumerSchemaRetriever.getSchema(Author.class); + + assertThat(schema).isNotNull(); + } + + @Test + void canObtainSchemaForSchemaWithChildrenSchema() { + final Schema schema = avroClasspathConsumerSchemaRetriever.getSchema(Book.class); + + assertThat(schema).isNotNull(); + } + + @Test + void obtainingSchemaForClassThatDoesNotExistThrowsException() { + assertThrows(ConsumerSchemaRetrieverException.class, () -> avroClasspathConsumerSchemaRetriever.getSchema(String.class)); + } + } +} \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/com/jashmore/sqs/extensions/registry/avro/AvroMessagePayloadDeserializerTest.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/com/jashmore/sqs/extensions/registry/avro/AvroMessagePayloadDeserializerTest.java new file mode 100644 index 00000000..cd648402 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/com/jashmore/sqs/extensions/registry/avro/AvroMessagePayloadDeserializerTest.java @@ -0,0 +1,77 @@ +package com.jashmore.sqs.extensions.registry.avro; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.jashmore.sqs.extensions.registry.MessagePayloadDeserializerException; +import org.apache.avro.Schema; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManager; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.io.IOException; +import java.util.Base64; + +@ExtendWith(MockitoExtension.class) +class AvroMessagePayloadDeserializerTest { + + @Mock + private AvroSchemaServiceManager avroSchemaServiceManager; + + @Test + void canDeserializePayloadOfMessage() throws IOException { + // arrange + final byte[] payloadAsBytes = "some message".getBytes(); + final Message message = Message.builder() + .body(Base64.getEncoder().encodeToString(payloadAsBytes)) + .build(); + when(avroSchemaServiceManager.readData(eq(String.class), eq(payloadAsBytes), any(Schema.class), any(Schema.class))).thenReturn("Result"); + + // act + final Object deserializedObject = new AvroMessagePayloadDeserializer(avroSchemaServiceManager) + .deserialize(message, mock(Schema.class), mock(Schema.class), String.class); + + // assert + assertThat(deserializedObject).isEqualTo("Result"); + } + + @Test + void appliesDeserializingFunctionToMessageContent() throws IOException { + // arrange + final byte[] transformedBytes = {1, 2}; + final Message message = Message.builder() + .body("payload") + .build(); + when(avroSchemaServiceManager.readData(eq(String.class), eq(transformedBytes), any(Schema.class), any(Schema.class))).thenReturn("Result"); + + // act + final Object deserializedObject = new AvroMessagePayloadDeserializer(avroSchemaServiceManager, (m) -> transformedBytes) + .deserialize(message, mock(Schema.class), mock(Schema.class), String.class); + + // assert + assertThat(deserializedObject).isEqualTo("Result"); + } + + @Test + void exceptionDeserializingPayloadThrowsException() throws IOException { + // arrange + final byte[] payloadAsBytes = "some message".getBytes(); + final Message message = Message.builder() + .body(Base64.getEncoder().encodeToString(payloadAsBytes)) + .build(); + when(avroSchemaServiceManager.readData(eq(String.class), any(), any(Schema.class), any(Schema.class))) + .thenThrow(IOException.class); + final AvroMessagePayloadDeserializer avroMessagePayloadDeserializer = new AvroMessagePayloadDeserializer(avroSchemaServiceManager); + + // act + assertThrows(MessagePayloadDeserializerException.class, + () -> avroMessagePayloadDeserializer.deserialize(message, mock(Schema.class), mock(Schema.class), String.class)); + } +} \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/com/jashmore/sqs/extensions/registry/avro/AvroSchemaRegistryProducerSchemaRetrieverTest.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/com/jashmore/sqs/extensions/registry/avro/AvroSchemaRegistryProducerSchemaRetrieverTest.java new file mode 100644 index 00000000..d473cc4e --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/com/jashmore/sqs/extensions/registry/avro/AvroSchemaRegistryProducerSchemaRetrieverTest.java @@ -0,0 +1,66 @@ +package com.jashmore.sqs.extensions.registry.avro; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +import com.jashmore.sqs.extensions.registry.ProducerSchemaRetrieverException; +import org.apache.avro.Schema; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.cloud.schema.registry.SchemaReference; +import org.springframework.cloud.schema.registry.client.SchemaRegistryClient; +import org.springframework.core.io.ClassPathResource; + +import java.io.IOException; + +@ExtendWith(MockitoExtension.class) +class AvroSchemaRegistryProducerSchemaRetrieverTest { + private static final Schema.Parser SCHEMA_PARSER = new Schema.Parser(); + private static final Schema SCHEMA; + + @Mock + private SchemaRegistryClient schemaRegistryClient; + + private AvroSchemaRegistryProducerSchemaRetriever avroSchemaRegistryProducerSchemaRetriever; + + static { + final ClassPathResource resource = new ClassPathResource("avro-test-schemas/import/author.avsc"); + try { + SCHEMA = SCHEMA_PARSER.parse(resource.getInputStream()); + } catch (IOException ioException) { + throw new RuntimeException("Error parsing schema: " + resource.getFilename(), ioException); + } + } + + @BeforeEach + void setUp() { + avroSchemaRegistryProducerSchemaRetriever = new AvroSchemaRegistryProducerSchemaRetriever(schemaRegistryClient); + } + + @Test + void canObtainSchemaFromRegistry() { + // arrange + final SchemaReference schemaReference = new SchemaReference("subject", 1, "format"); + when(schemaRegistryClient.fetch(schemaReference)).thenReturn(SCHEMA.toString()); + + // act + final Schema schema = avroSchemaRegistryProducerSchemaRetriever.getSchema(schemaReference); + + // assert + assertThat(schema).isEqualTo(SCHEMA); + } + + @Test + void errorParsingSchemaWillThrowException() { + // arrange + final SchemaReference schemaReference = new SchemaReference("subject", 1, "format"); + when(schemaRegistryClient.fetch(schemaReference)).thenReturn("invalid Avro schema"); + + // act + assertThrows(ProducerSchemaRetrieverException.class, () -> avroSchemaRegistryProducerSchemaRetriever.getSchema(schemaReference)); + } +} \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/it/com/jashmore/sqs/extensions/registry/avro/AvroSpringCloudSchemaRegistryIntegrationTest.java b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/it/com/jashmore/sqs/extensions/registry/avro/AvroSpringCloudSchemaRegistryIntegrationTest.java new file mode 100644 index 00000000..e68bae24 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/java/it/com/jashmore/sqs/extensions/registry/avro/AvroSpringCloudSchemaRegistryIntegrationTest.java @@ -0,0 +1,97 @@ +package it.com.jashmore.sqs.extensions.registry.avro; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.jashmore.sqs.extensions.registry.SpringCloudSchemaRegistryPayload; +import com.jashmore.sqs.extensions.registry.avro.AvroSpringCloudSchemaProperties; +import com.jashmore.sqs.extensions.registry.avro.EnableSchemaRegistrySqsExtension; +import com.jashmore.sqs.extensions.registry.model.Author; +import com.jashmore.sqs.extensions.registry.model.Book; +import com.jashmore.sqs.registry.AvroSchemaRegistrySqsAsyncClient; +import com.jashmore.sqs.spring.container.basic.QueueListener; +import com.jashmore.sqs.test.LocalSqsExtension; +import com.jashmore.sqs.util.LocalSqsAsyncClient; +import com.jashmore.sqs.extensions.registry.InMemorySchemaRegistryClient; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManager; +import org.springframework.cloud.schema.registry.client.SchemaRegistryClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +@SpringBootTest(classes = AvroSpringCloudSchemaRegistryIntegrationTest.Application.class) +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +public class AvroSpringCloudSchemaRegistryIntegrationTest { + private static final String QUEUE_NAME = "test"; + + @RegisterExtension + static final LocalSqsExtension LOCAL_SQS_RULE = new LocalSqsExtension(QUEUE_NAME); + + @Autowired + private AvroSchemaServiceManager avroSchemaServiceManager; + + @Autowired + private AvroSpringCloudSchemaProperties avroSpringCloudSchemaProperties; + + @Autowired + private SchemaRegistryClient schemaRegistryClient; + + static final AtomicReference RECEIVED_BOOK = new AtomicReference<>(); + static final CountDownLatch MESSAGE_RECEIVED_LATCH = new CountDownLatch(1); + + @SpringBootApplication + @EnableSchemaRegistrySqsExtension + public static class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + + @Bean + @Primary + public InMemorySchemaRegistryClient inMemorySchemaRegistryClient() { + return new InMemorySchemaRegistryClient(); + } + + @Bean + public LocalSqsAsyncClient localSqsAsyncClient() { + return LOCAL_SQS_RULE.getLocalAmazonSqsAsync(); + } + + @SuppressWarnings("unused") + @QueueListener(value = "test") + public void myMethod(@SpringCloudSchemaRegistryPayload Book book) { + RECEIVED_BOOK.set(book); + MESSAGE_RECEIVED_LATCH.countDown(); + } + } + + @Test + void name() throws ExecutionException, InterruptedException { + // arrange + final AvroSchemaRegistrySqsAsyncClient avroClient = new AvroSchemaRegistrySqsAsyncClient( + LOCAL_SQS_RULE.getLocalAmazonSqsAsync(), + schemaRegistryClient, + avroSchemaServiceManager, + avroSpringCloudSchemaProperties.getSchemaImports(), + avroSpringCloudSchemaProperties.getSchemaLocations() + ); + final String queueUrl = avroClient.getQueueUrl(builder -> builder.queueName(QUEUE_NAME)).get().queueUrl(); + final Book book = new Book("id", "name", new Author("firstname", "lastname")); + + // act + avroClient.sendAvroMessage("prefix", "contentType", book, builder -> builder.queueUrl(queueUrl)); + MESSAGE_RECEIVED_LATCH.await(5, TimeUnit.SECONDS); + + // assert + assertThat(RECEIVED_BOOK.get()).isEqualTo(book); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/application.yml b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/application.yml new file mode 100644 index 00000000..cf5db696 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/application.yml @@ -0,0 +1,10 @@ +spring: + cloud: + schema-registry-client: + endpoint: http://localhost:8990 + schema: + avro: + schema-imports: + - classpath:avro-test-schemas/import/author.avsc + schema-locations: + - classpath:avro-test-schemas/schema/book.avsc diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/avro-non-generated-test-schemas/non-built-schema.avsc b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/avro-non-generated-test-schemas/non-built-schema.avsc new file mode 100644 index 00000000..f5a3eaf6 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/avro-non-generated-test-schemas/non-built-schema.avsc @@ -0,0 +1,9 @@ +{ + "namespace" : "com.jashmore.sqs.extensions.registry.model", + "type" : "record", + "name" : "NonBuiltSchema", + "fields" : [ + { "name": "firstname", "type": "string" }, + { "name": "lastname", "type": "string" } + ] +} \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/avro-test-schemas/import/author.avsc b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/avro-test-schemas/import/author.avsc new file mode 100644 index 00000000..b8379ea6 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/avro-test-schemas/import/author.avsc @@ -0,0 +1,9 @@ +{ + "namespace" : "com.jashmore.sqs.extensions.registry.model", + "type" : "record", + "name" : "Author", + "fields" : [ + { "name": "firstname", "type": "string" }, + { "name": "lastname", "type": "string" } + ] +} \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/avro-test-schemas/schema/book.avsc b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/avro-test-schemas/schema/book.avsc new file mode 100644 index 00000000..edc20f88 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/avro-spring-cloud-schema-registry-extension/src/test/resources/avro-test-schemas/schema/book.avsc @@ -0,0 +1,10 @@ +{ + "namespace" : "com.jashmore.sqs.extensions.registry.model", + "type" : "record", + "name" : "Book", + "fields" : [ + { "name":"id","type":"string" }, + { "name":"name","type":"string" }, + { "name":"author","type":"Author" } + ] +} diff --git a/extensions/spring-cloud-schema-registry-extension/in-memory-spring-cloud-schema-registry/pom.xml b/extensions/spring-cloud-schema-registry-extension/in-memory-spring-cloud-schema-registry/pom.xml new file mode 100644 index 00000000..2d91c6eb --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/in-memory-spring-cloud-schema-registry/pom.xml @@ -0,0 +1,35 @@ + + + + spring-cloud-schema-registry-extension + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + in-memory-spring-cloud-schema-registry + + Java Dynamic SQS Listener - Extensions - Spring Cloud Schema Registry Extension - In Memory Spring Cloud Schema Registry + In Memory implementation of the Spring Cloud Schema Registry that is used for testing purposes. + + + + org.springframework.cloud + spring-cloud-schema-registry-client + + + org.apache.avro + avro + + + + + + org.projectlombok + lombok + provided + + + \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/in-memory-spring-cloud-schema-registry/src/main/java/com/jashmore/sqs/extensions/registry/InMemorySchemaRegistryClient.java b/extensions/spring-cloud-schema-registry-extension/in-memory-spring-cloud-schema-registry/src/main/java/com/jashmore/sqs/extensions/registry/InMemorySchemaRegistryClient.java new file mode 100644 index 00000000..70e0a78c --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/in-memory-spring-cloud-schema-registry/src/main/java/com/jashmore/sqs/extensions/registry/InMemorySchemaRegistryClient.java @@ -0,0 +1,58 @@ +package com.jashmore.sqs.extensions.registry; + +import lombok.Builder; +import lombok.Value; +import org.springframework.cloud.schema.registry.SchemaReference; +import org.springframework.cloud.schema.registry.SchemaRegistrationResponse; +import org.springframework.cloud.schema.registry.client.SchemaRegistryClient; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.ThreadSafe; + +/** + * In Memory implementation of the {@link SchemaRegistryClient} that is used for mocking this service in Integration Tests. + */ +@ThreadSafe +public class InMemorySchemaRegistryClient implements SchemaRegistryClient { + private final Map schemas = new ConcurrentHashMap<>(); + + private final AtomicInteger schemaVersionNumber = new AtomicInteger(1); + private final AtomicInteger schemaId = new AtomicInteger(1); + + @Override + public SchemaRegistrationResponse register(final String subject, final String format, final String schema) { + final SchemaReference schemaReference = new SchemaReference(subject, schemaVersionNumber.getAndIncrement(), format); + final SchemaDetails schemaDetails = SchemaDetails.builder() + .id(schemaId.getAndIncrement()) + .schemaDefinition(schema) + .build(); + schemas.put(schemaReference, schemaDetails); + final SchemaRegistrationResponse response = new SchemaRegistrationResponse(); + response.setSchemaReference(schemaReference); + response.setId(schemaDetails.id); + return response; + } + + @Override + public String fetch(final SchemaReference schemaReference) { + return schemas.get(schemaReference).schemaDefinition; + } + + @Override + public String fetch(final int id) { + return schemas.values().stream() + .filter(schemaDetails -> schemaDetails.id == id) + .map(schemaDetails -> schemaDetails.schemaDefinition) + .findFirst() + .orElseThrow(() -> new RuntimeException("Could not find schema definition with ID " + id)); + } + + @Value + @Builder + private static class SchemaDetails { + int id; + String schemaDefinition; + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/pom.xml b/extensions/spring-cloud-schema-registry-extension/pom.xml new file mode 100644 index 00000000..965eb4f7 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/pom.xml @@ -0,0 +1,24 @@ + + + + extensions + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + spring-cloud-schema-registry-extension + pom + + + Java Dynamic SQS Listener - Extensions - Spring Cloud Schema Registry + Extension for creating an Argument Resolver that can deserialize message payloads using schemas defined in the Spring Cloud Schema Registry. + + + spring-cloud-schema-registry-extension-api + avro-spring-cloud-schema-registry-extension + in-memory-spring-cloud-schema-registry + + diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/README.md b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/README.md new file mode 100644 index 00000000..e1197c18 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/README.md @@ -0,0 +1,4 @@ +# Spring Cloud Schema Registry Extension +This extension parses the payload of a message that has been serialized by a tool such as an Apache Avro. This is the basic API and does not contain +the implementation details on how to obtain the schemas or how to serialize the payload with these schemas. See the implementations for specific +details, such as the [avro-spring-cloud-schema-registry-extension](../avro-spring-cloud-schema-registry-extension). \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/pom.xml b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/pom.xml new file mode 100644 index 00000000..b60f1607 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/pom.xml @@ -0,0 +1,52 @@ + + + + spring-cloud-schema-registry-extension + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + spring-cloud-schema-registry-extension-api + + Java Dynamic SQS Listener - Extensions - Spring Cloud Schema Registry Extension - API + API for building a payload parser that has been serialized via the schema registered in the Spring Cloud Schema Registry. + + + ../../../configuration/spotbugs/bugsExcludeFilter.xml + + + + + com.jashmore + java-dynamic-sqs-listener-api + ${project.version} + + + + com.jashmore + common-utils + ${project.version} + + + + org.springframework.cloud + spring-cloud-schema-registry-client + + + + org.apache.avro + avro + + + + + + org.projectlombok + lombok + provided + + + \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ConsumerSchemaRetriever.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ConsumerSchemaRetriever.java new file mode 100644 index 00000000..4d514d6c --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ConsumerSchemaRetriever.java @@ -0,0 +1,23 @@ +package com.jashmore.sqs.extensions.registry; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * Retriever used to get the schema representation of this class that will deserialize the message payload. + * + * @param the type for the schema, for example an Avro schema + */ +@ThreadSafe +@FunctionalInterface +public interface ConsumerSchemaRetriever { + /** + * Get the schema representation for the provided class. + * + *

This will be used by the {@link MessagePayloadDeserializer} to determine how it can transform the message + * that may be have been produced with a different version of the schema. + * + * @param clazz the class of the object to get the schema for + * @return the schema definition for this class + */ + T getSchema(Class clazz); +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ConsumerSchemaRetrieverException.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ConsumerSchemaRetrieverException.java new file mode 100644 index 00000000..a3598c75 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ConsumerSchemaRetrieverException.java @@ -0,0 +1,11 @@ +package com.jashmore.sqs.extensions.registry; + +/** + * Exception thrown if there is a problem getting the consumer schema. + */ +public class ConsumerSchemaRetrieverException extends RuntimeException { + + public ConsumerSchemaRetrieverException(final String message) { + super(message); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/InMemoryCachingProducerSchemaRetriever.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/InMemoryCachingProducerSchemaRetriever.java new file mode 100644 index 00000000..fca59bfc --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/InMemoryCachingProducerSchemaRetriever.java @@ -0,0 +1,26 @@ +package com.jashmore.sqs.extensions.registry; + +import org.springframework.cloud.schema.registry.SchemaReference; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; + +/** + * In memory cache implementation that can be used to reduce the number of times that the schema + * is calculated (which would be a lot). + */ +@ThreadSafe +public class InMemoryCachingProducerSchemaRetriever implements ProducerSchemaRetriever { + private final Map cache = new ConcurrentHashMap<>(); + private final ProducerSchemaRetriever delegate; + + public InMemoryCachingProducerSchemaRetriever(final ProducerSchemaRetriever delegate) { + this.delegate = delegate; + } + + @Override + public T getSchema(final SchemaReference reference) { + return cache.computeIfAbsent(reference, delegate::getSchema); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/MessageAttributeSchemaReferenceExtractor.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/MessageAttributeSchemaReferenceExtractor.java new file mode 100644 index 00000000..14939248 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/MessageAttributeSchemaReferenceExtractor.java @@ -0,0 +1,54 @@ +package com.jashmore.sqs.extensions.registry; + +import org.springframework.cloud.schema.registry.SchemaReference; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Obtains the MimeType that contains information about the schema of the payload ({@link SchemaReference}) by looking + * in the message attribute of the message. + */ +public class MessageAttributeSchemaReferenceExtractor implements SchemaReferenceExtractor { + private static final Pattern MIME_TYPE_PATTERN = Pattern.compile("application/([^.]+)\\.([\\p{Alnum}\\$\\.]+)\\.v(\\p{Digit}+)\\+([^.]+)"); + public static final String CONTENT_TYPE_MESSAGE_ATTRIBUTE_NAME = "contentType"; + + private final String attributeName; + + public MessageAttributeSchemaReferenceExtractor() { + this(CONTENT_TYPE_MESSAGE_ATTRIBUTE_NAME); + } + + /** + * Create the extractor using the provided Message Attribute name. + * + * @param attributeName the attribute name in the SQS message that will contain the content type + */ + public MessageAttributeSchemaReferenceExtractor(final String attributeName) { + this.attributeName = attributeName; + } + + @Override + public SchemaReference extract(final Message message) { + final MessageAttributeValue contentTypeAttribute = message.messageAttributes().get(attributeName); + if (contentTypeAttribute == null) { + throw new SchemaReferenceExtractorException("No attribute found with name: " + attributeName); + } + + if (!contentTypeAttribute.dataType().equals("String")) { + throw new SchemaReferenceExtractorException("Attribute expected to be a String but is of type: " + contentTypeAttribute.dataType()); + } + + final Matcher matcher = MIME_TYPE_PATTERN.matcher(contentTypeAttribute.stringValue()); + if (!matcher.matches()) { + throw new SchemaReferenceExtractorException("Content type attribute value is not in the expected format: " + contentTypeAttribute.stringValue()); + } + + final String subject = matcher.group(2); + final String version = matcher.group(3); + final String format = matcher.group(4); + return new SchemaReference(subject, Integer.parseInt(version), format); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/MessagePayloadDeserializer.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/MessagePayloadDeserializer.java new file mode 100644 index 00000000..53c3d783 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/MessagePayloadDeserializer.java @@ -0,0 +1,34 @@ +package com.jashmore.sqs.extensions.registry; + +import software.amazon.awssdk.services.sqs.model.Message; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * Used to deserialize the payload of a message into a POJO represented by the given schemas. + * + * @param the type of the schema used to represent the object + */ +@ThreadSafe +@FunctionalInterface +public interface MessagePayloadDeserializer { + /** + * Deserialize the message body of the message to the required object type. + * + *

This will take the content of the message body that was serialized in the schema defined in the producer schema + * and deserialize it to this consumer's schema, returning the new object. + * + *

For example, in the producer schema there may have only been a temperature field but in the new consumer schema there + * is an internal and external temperature. For this case the new schema would have definitions of how to take the old schema + * to produce the new schema, e.g. just put the temperature into the internal temperature and set external temperature as + * zero. + * + * @param message the message to deserialize + * @param producerSchema the schema of the payload that was sent by the producer + * @param consumerSchema the schema that the consumer can handle + * @param clazz the class of the object to deserialize to + * @return the deserialized body + * @throws MessagePayloadDeserializerException when there was an error deserializing + */ + Object deserialize(Message message, T producerSchema, T consumerSchema, Class clazz) throws MessagePayloadDeserializerException; +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/MessagePayloadDeserializerException.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/MessagePayloadDeserializerException.java new file mode 100644 index 00000000..b1ff9080 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/MessagePayloadDeserializerException.java @@ -0,0 +1,10 @@ +package com.jashmore.sqs.extensions.registry; + +/** + * Exception thrown when there was an error trying to deserialize the message payload to the required schema. + */ +public class MessagePayloadDeserializerException extends RuntimeException { + public MessagePayloadDeserializerException(final Throwable cause) { + super(cause); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ProducerSchemaRetriever.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ProducerSchemaRetriever.java new file mode 100644 index 00000000..33466025 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ProducerSchemaRetriever.java @@ -0,0 +1,26 @@ +package com.jashmore.sqs.extensions.registry; + +import org.springframework.cloud.schema.registry.SchemaReference; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * Used to obtain the schema for a message that was sent from a producer. + * + * @param the type of the schema, for example an Avro schema + */ +@ThreadSafe +@FunctionalInterface +public interface ProducerSchemaRetriever { + /** + * Given the schema reference, obtain the Schema that represents the payload that was sent from the producer. + * + *

For example, given that it is an object of type "Signal.v2", return the schema for this so the consumer + * will know how to transform this to the version that they have defined, e.g. "Signal.v3". + * + * @param reference the reference to the schema that the producer was using + * @return the schema definition + * @throws ProducerSchemaRetrieverException when there was an error getting the schema + */ + T getSchema(SchemaReference reference) throws ProducerSchemaRetrieverException; +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ProducerSchemaRetrieverException.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ProducerSchemaRetrieverException.java new file mode 100644 index 00000000..2ef50a8e --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/ProducerSchemaRetrieverException.java @@ -0,0 +1,11 @@ +package com.jashmore.sqs.extensions.registry; + +/** + * Exception thrown when there was an error trying to obtain the schema of the message that the producer + * used to publish the message. + */ +public class ProducerSchemaRetrieverException extends RuntimeException { + public ProducerSchemaRetrieverException(final Throwable cause) { + super(cause); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SchemaReferenceExtractor.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SchemaReferenceExtractor.java new file mode 100644 index 00000000..8963c98e --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SchemaReferenceExtractor.java @@ -0,0 +1,21 @@ +package com.jashmore.sqs.extensions.registry; + +import org.springframework.cloud.schema.registry.SchemaReference; +import software.amazon.awssdk.services.sqs.model.Message; + +import javax.annotation.concurrent.ThreadSafe; + +@ThreadSafe +@FunctionalInterface +public interface SchemaReferenceExtractor { + /** + * Obtain the {@link SchemaReference} from the message to use for determining what schema should be used to deserialize the message. + * + *

This could be obtained by looking at the content type of the {@link Message}, for example the {@link MessageAttributeSchemaReferenceExtractor} + * which looks at the message attribute of the message to get the version, etc. + * + * @param message the message to process + * @return the schema reference for this message + */ + SchemaReference extract(Message message); +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SchemaReferenceExtractorException.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SchemaReferenceExtractorException.java new file mode 100644 index 00000000..9ff204ad --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SchemaReferenceExtractorException.java @@ -0,0 +1,10 @@ +package com.jashmore.sqs.extensions.registry; + +/** + * Exception for when there was a problem determining the version of the schema for the received message. + */ +public class SchemaReferenceExtractorException extends RuntimeException { + public SchemaReferenceExtractorException(final String message) { + super(message); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaArgumentResolver.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaArgumentResolver.java new file mode 100644 index 00000000..41d8f5c3 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaArgumentResolver.java @@ -0,0 +1,68 @@ +package com.jashmore.sqs.extensions.registry; + +import com.jashmore.sqs.QueueProperties; +import com.jashmore.sqs.argument.ArgumentResolutionException; +import com.jashmore.sqs.argument.ArgumentResolver; +import com.jashmore.sqs.argument.MethodParameter; +import com.jashmore.sqs.util.annotation.AnnotationUtils; +import org.springframework.cloud.schema.registry.SchemaReference; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.lang.annotation.Annotation; + +/** + * Argument resolver for taking messages that were serialized using a schema versioning tool like Apache Avro. + * + *

This will obtain the schema of the object that the producer used to serialize the object and the schema that the + * consumer can consume and serialize the message payload between these versions. + * + * @param the spring cloud registry schema type used to resolve this argument + */ +public class SpringCloudSchemaArgumentResolver implements ArgumentResolver { + private final SchemaReferenceExtractor schemaReferenceExtractor; + private final ConsumerSchemaRetriever consumerSchemaRetriever; + private final ProducerSchemaRetriever producerSchemaRetriever; + private final MessagePayloadDeserializer messagePayloadDeserializer; + private final Class annotationClass; + + /** + * Constructor. + * + * @param schemaReferenceExtractor used to obtain the {@link SchemaReference} of this message payload + * @param consumerSchemaRetriever used to obtain the schema that the consumer knows for this type + * @param producerSchemaRetriever used to obtain the schema that the producer used to send the message + * @param messagePayloadDeserializer used to deserialize the message payload using the schemas for the message + * @param annotationClass the annotation that should be used to trigger this argument resolver + */ + public SpringCloudSchemaArgumentResolver(final SchemaReferenceExtractor schemaReferenceExtractor, + final ConsumerSchemaRetriever consumerSchemaRetriever, + final ProducerSchemaRetriever producerSchemaRetriever, + final MessagePayloadDeserializer messagePayloadDeserializer, + final Class annotationClass) { + this.schemaReferenceExtractor = schemaReferenceExtractor; + this.consumerSchemaRetriever = consumerSchemaRetriever; + this.producerSchemaRetriever = producerSchemaRetriever; + this.messagePayloadDeserializer = messagePayloadDeserializer; + this.annotationClass = annotationClass; + } + + @Override + public boolean canResolveParameter(final MethodParameter methodParameter) { + return AnnotationUtils.findParameterAnnotation(methodParameter, annotationClass).isPresent(); + } + + @Override + public Object resolveArgumentForParameter(final QueueProperties queueProperties, + final MethodParameter methodParameter, + final Message message) throws ArgumentResolutionException { + try { + final Class clazz = methodParameter.getParameter().getType(); + final SchemaReference schemaReference = schemaReferenceExtractor.extract(message); + final T producerSchema = producerSchemaRetriever.getSchema(schemaReference); + final T consumerSchema = consumerSchemaRetriever.getSchema(clazz); + return messagePayloadDeserializer.deserialize(message, producerSchema, consumerSchema, clazz); + } catch (RuntimeException runtimeException) { + throw new ArgumentResolutionException(runtimeException); + } + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaRegistryPayload.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaRegistryPayload.java new file mode 100644 index 00000000..24d5cfbf --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaRegistryPayload.java @@ -0,0 +1,19 @@ +package com.jashmore.sqs.extensions.registry; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +/** + * Default annotation for a message consumer's parameter that indicate that it should be resolved to the payload of the message using schemas defined + * in the Spring Cloud Schema Registry. + * + *

Parameters marked with this annotation do not have a type restriction but the type of the parameter must be able to be map + * from the message body via the Schema, such as an Avro schema. + */ +@Retention(value = RUNTIME) +@Target(ElementType.PARAMETER) +public @interface SpringCloudSchemaRegistryPayload { +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaSqsConfiguration.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaSqsConfiguration.java new file mode 100644 index 00000000..b18f3464 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/main/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaSqsConfiguration.java @@ -0,0 +1,17 @@ +package com.jashmore.sqs.extensions.registry; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.schema.registry.client.EnableSchemaRegistryClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@EnableSchemaRegistryClient +@SuppressWarnings("checkstyle:javadocmethod") +public class SpringCloudSchemaSqsConfiguration { + @Bean + @ConditionalOnMissingBean(SchemaReferenceExtractor.class) + public SchemaReferenceExtractor schemaReferenceExtractor() { + return new MessageAttributeSchemaReferenceExtractor(); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/test/java/com/jashmore/sqs/extensions/registry/InMemoryCachingProducerSchemaRetrieverTest.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/test/java/com/jashmore/sqs/extensions/registry/InMemoryCachingProducerSchemaRetrieverTest.java new file mode 100644 index 00000000..95d5a42e --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/test/java/com/jashmore/sqs/extensions/registry/InMemoryCachingProducerSchemaRetrieverTest.java @@ -0,0 +1,80 @@ +package com.jashmore.sqs.extensions.registry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.cloud.schema.registry.SchemaReference; + +@ExtendWith(MockitoExtension.class) +class InMemoryCachingProducerSchemaRetrieverTest { + + @Mock + private ProducerSchemaRetriever mockDelegate; + + private InMemoryCachingProducerSchemaRetriever producerSchemaRetriever; + + @BeforeEach + void setUp() { + producerSchemaRetriever = new InMemoryCachingProducerSchemaRetriever<>(mockDelegate); + } + + @Test + void cachesSubsequentCallsForTheSameSchema() { + // arrange + final SchemaReference reference = new SchemaReference("subject", 1, "avro"); + when(mockDelegate.getSchema(reference)).thenReturn("schema"); + + // act + final String schema = producerSchemaRetriever.getSchema(reference); + final String secondSchema = producerSchemaRetriever.getSchema(reference); + + // assert + assertThat(schema).isSameAs(secondSchema); + verify(mockDelegate, times(1)).getSchema(reference); + } + + @Test + void cachesAreAppliedPerReferenceValuesNotByInstance() { + // arrange + final SchemaReference reference = new SchemaReference("subject", 1, "avro"); + final SchemaReference anotherReference = new SchemaReference("subject", 1, "avro"); + when(mockDelegate.getSchema(any())).thenReturn("schema"); + + // act + final String schema = producerSchemaRetriever.getSchema(reference); + final String secondSchema = producerSchemaRetriever.getSchema(anotherReference); + + // assert + assertThat(schema).isSameAs(secondSchema); + verify(mockDelegate, times(1)).getSchema(reference); + } + + @Test + void differentSchemaReferencesWillCacheDifferentValues() { + // arrange + final SchemaReference reference = new SchemaReference("subject", 1, "avro"); + final SchemaReference secondReference = new SchemaReference("subject", 2, "avro"); + when(mockDelegate.getSchema(reference)).thenReturn("schema"); + when(mockDelegate.getSchema(secondReference)).thenReturn("secondSchema"); + + // act + producerSchemaRetriever.getSchema(reference); + final String schema = producerSchemaRetriever.getSchema(reference); + producerSchemaRetriever.getSchema(secondReference); + final String secondSchema = producerSchemaRetriever.getSchema(secondReference); + + // assert + assertThat(schema).isEqualTo("schema"); + assertThat(secondSchema).isEqualTo("secondSchema"); + verify(mockDelegate, times(1)).getSchema(reference); + verify(mockDelegate, times(1)).getSchema(secondReference); + } +} \ No newline at end of file diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/test/java/com/jashmore/sqs/extensions/registry/MessageAttributeSchemaReferenceExtractorTest.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/test/java/com/jashmore/sqs/extensions/registry/MessageAttributeSchemaReferenceExtractorTest.java new file mode 100644 index 00000000..a32534e5 --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/test/java/com/jashmore/sqs/extensions/registry/MessageAttributeSchemaReferenceExtractorTest.java @@ -0,0 +1,110 @@ +package com.jashmore.sqs.extensions.registry; + +import static com.jashmore.sqs.extensions.registry.MessageAttributeSchemaReferenceExtractor.CONTENT_TYPE_MESSAGE_ATTRIBUTE_NAME; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.google.common.collect.ImmutableMap; + +import org.junit.jupiter.api.Test; +import org.springframework.cloud.schema.registry.SchemaReference; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; + +class MessageAttributeSchemaReferenceExtractorTest { + private final MessageAttributeSchemaReferenceExtractor messageAttributeSchemaReferenceExtractor = new MessageAttributeSchemaReferenceExtractor(); + + @Test + void noMessageAttributeWIthNameThrowsException() { + // arrange + final Message message = Message.builder().build(); + + // act + final SchemaReferenceExtractorException exception = assertThrows(SchemaReferenceExtractorException.class, + () -> messageAttributeSchemaReferenceExtractor.extract(message)); + + // assert + assertThat(exception).hasMessageContaining("No attribute found with name: contentType"); + } + + @Test + void contentTypeValueThatIsNotStringThrowsException() { + // arrange + final Message message = Message.builder() + .messageAttributes(ImmutableMap.of( + CONTENT_TYPE_MESSAGE_ATTRIBUTE_NAME, MessageAttributeValue.builder() + .dataType("Binary") + .binaryValue(SdkBytes.fromByteArray(new byte[]{0, 1})) + .build() + )) + .build(); + + // act + final SchemaReferenceExtractorException exception = assertThrows(SchemaReferenceExtractorException.class, + () -> messageAttributeSchemaReferenceExtractor.extract(message)); + + // assert + assertThat(exception).hasMessageContaining("Attribute expected to be a String but is of type: Binary"); + } + + @Test + void contentTypeThatIsUnsuccessfullyParsedThrowsException() { + // arrange + final Message message = Message.builder() + .messageAttributes(ImmutableMap.of( + CONTENT_TYPE_MESSAGE_ATTRIBUTE_NAME, MessageAttributeValue.builder() + .dataType("String") + .stringValue("invalid-format") + .build() + )) + .build(); + + // act + final SchemaReferenceExtractorException exception = assertThrows(SchemaReferenceExtractorException.class, + () -> messageAttributeSchemaReferenceExtractor.extract(message)); + + // assert + assertThat(exception).hasMessageContaining("Content type attribute value is not in the expected format: invalid-format"); + } + + @Test + void contentTypeWithNonNumberVersionThrowsException() { + // arrange + final Message message = Message.builder() + .messageAttributes(ImmutableMap.of( + CONTENT_TYPE_MESSAGE_ATTRIBUTE_NAME, MessageAttributeValue.builder() + .dataType("String") + .stringValue("application/prefix.name.vOne+avro") + .build() + )) + .build(); + + // act + final SchemaReferenceExtractorException exception = assertThrows(SchemaReferenceExtractorException.class, + () -> messageAttributeSchemaReferenceExtractor.extract(message)); + + // assert + assertThat(exception).hasMessageContaining("Content type attribute value is not in the expected format: application/prefix.name.vOne+avro"); + } + + + @Test + void contentTypeThatCanBeSuccessfullyParsedReturnsSchemaReference() { + // arrange + final Message message = Message.builder() + .messageAttributes(ImmutableMap.of( + CONTENT_TYPE_MESSAGE_ATTRIBUTE_NAME, MessageAttributeValue.builder() + .dataType("String") + .stringValue("application/prefix.name.v1+avro") + .build() + )) + .build(); + + // act + final SchemaReference schemaReference = messageAttributeSchemaReferenceExtractor.extract(message); + + // assert + assertThat(schemaReference).isEqualTo(new SchemaReference("name", 1, "avro")); + } +} diff --git a/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/test/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaArgumentResolverTest.java b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/test/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaArgumentResolverTest.java new file mode 100644 index 00000000..46137d7d --- /dev/null +++ b/extensions/spring-cloud-schema-registry-extension/spring-cloud-schema-registry-extension-api/src/test/java/com/jashmore/sqs/extensions/registry/SpringCloudSchemaArgumentResolverTest.java @@ -0,0 +1,103 @@ +package com.jashmore.sqs.extensions.registry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.jashmore.sqs.QueueProperties; +import com.jashmore.sqs.argument.ArgumentResolutionException; +import com.jashmore.sqs.argument.DefaultMethodParameter; +import com.jashmore.sqs.argument.MethodParameter; +import org.junit.jupiter.api.Test; +import org.springframework.cloud.schema.registry.SchemaReference; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.lang.reflect.Method; + +class SpringCloudSchemaArgumentResolverTest { + @Test + void willAllowMethodParametersToBeProcessedWithCertainAnnotation() throws NoSuchMethodException { + // arrange + final SpringCloudSchemaArgumentResolver resolver = new SpringCloudSchemaArgumentResolver<>( + message -> new SchemaReference("name", 1, "numbered"), + (clazz) -> 2, + (reference) -> 1, + (message, producerSchema, consumerSchema, clazz) -> "value", + SpringCloudSchemaRegistryPayload.class + ); + final Method method = SpringCloudSchemaArgumentResolverTest.class.getMethod("myMethod", String.class, String.class); + final MethodParameter methodParameter = new DefaultMethodParameter( + method, + method.getParameters()[0], + 0 + ); + final MethodParameter secondMethodParameter = new DefaultMethodParameter( + method, + method.getParameters()[1], + 1 + ); + + // act + assertThat(resolver.canResolveParameter(methodParameter)).isTrue(); + assertThat(resolver.canResolveParameter(secondMethodParameter)).isFalse(); + } + + @Test + void willObtainSchemasAndDeserializeWithTheProvidedValues() throws NoSuchMethodException { + // arrange + final SpringCloudSchemaArgumentResolver resolver = new SpringCloudSchemaArgumentResolver<>( + message -> new SchemaReference("name", 1, "numbered"), + (clazz) -> 2, + (reference) -> 1, + (message, producerSchema, consumerSchema, clazz) + -> "producer: " + producerSchema + " consumer: " + consumerSchema + " class: " + clazz.getSimpleName(), + SpringCloudSchemaRegistryPayload.class + ); + final Method method = SpringCloudSchemaArgumentResolverTest.class.getMethod("myMethod", String.class, String.class); + final MethodParameter methodParameter = new DefaultMethodParameter( + method, + method.getParameters()[0], + 0 + ); + + // act + final Object value = resolver.resolveArgumentForParameter(QueueProperties.builder().build(), methodParameter, Message.builder().build()); + + // assert + assertThat(value).isEqualTo("producer: 1 consumer: 2 class: String"); + } + + @Test + void anyExceptionDuringResolutionThrowsArgumentResolutionException() throws NoSuchMethodException { + // arrange + final ProducerSchemaRetrieverException producerSchemaRetrieverException + = new ProducerSchemaRetrieverException(new RuntimeException("Expected Test Exception")); + final SpringCloudSchemaArgumentResolver resolver = new SpringCloudSchemaArgumentResolver<>( + message -> new SchemaReference("name", 1, "numbered"), + (clazz) -> { + throw producerSchemaRetrieverException; + }, + (reference) -> 1, + (message, producerSchema, consumerSchema, clazz) + -> "producer: " + producerSchema + " consumer: " + consumerSchema + " class: " + clazz.getSimpleName(), + SpringCloudSchemaRegistryPayload.class + ); + final Method method = SpringCloudSchemaArgumentResolverTest.class.getMethod("myMethod", String.class, String.class); + final MethodParameter methodParameter = new DefaultMethodParameter( + method, + method.getParameters()[0], + 0 + ); + + // act + final ArgumentResolutionException exception = assertThrows(ArgumentResolutionException.class, + () -> resolver.resolveArgumentForParameter(QueueProperties.builder().build(), methodParameter, Message.builder().build())); + + // assert + assertThat(exception).hasCause(producerSchemaRetrieverException); + } + + + public void myMethod(@SpringCloudSchemaRegistryPayload String exampleParameter, String anotherPayload) { + + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7f19be6f..3c5aa391 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,7 @@ examples + extensions java-dynamic-sqs-listener-api java-dynamic-sqs-listener-core java-dynamic-sqs-listener-spring @@ -66,6 +67,7 @@ configuration/spotbugs/bugsExcludeFilter.xml 3.12.2 + 1.9.2 2.13.3 3.1.0 3.1.1 @@ -164,6 +166,12 @@ import + + org.apache.avro + avro + ${avro.version} + + cglib cglib @@ -281,6 +289,11 @@ build-helper-maven-plugin ${builder-helper-maven-plugin.version} + + org.apache.avro + avro-maven-plugin + ${avro.version} + org.springframework.boot spring-boot-maven-plugin @@ -365,6 +378,11 @@ org.apache.maven.plugins maven-checkstyle-plugin ${checkstyle-plugin.version} + + + ${project.build.sourceDirectory} + + validate @@ -396,6 +414,7 @@ + true true Low Max diff --git a/util/avro-spring-cloud-schema-registry-sqs-client/pom.xml b/util/avro-spring-cloud-schema-registry-sqs-client/pom.xml new file mode 100644 index 00000000..1de4e93c --- /dev/null +++ b/util/avro-spring-cloud-schema-registry-sqs-client/pom.xml @@ -0,0 +1,44 @@ + + + + util + com.jashmore + 3.0.1-SNAPSHOT + + 4.0.0 + + avro-spring-cloud-schema-registry-sqs-client + + Java Dynamic SQS Listener - Utilities - Avro Spring Cloud Registry SQS Client + Wrapper around the AWS SQS Async Client to help serialize messages using Avro and the Spring Cloud Schema Registry. + + + + software.amazon.awssdk + sqs + + + + org.springframework.cloud + spring-cloud-schema-registry-client + + + + org.apache.avro + avro + + + + com.google.guava + guava + + + + org.projectlombok + lombok + provided + + + diff --git a/util/avro-spring-cloud-schema-registry-sqs-client/src/main/java/com/jashmore/sqs/registry/AvroSchemaRegistrySqsAsyncClient.java b/util/avro-spring-cloud-schema-registry-sqs-client/src/main/java/com/jashmore/sqs/registry/AvroSchemaRegistrySqsAsyncClient.java new file mode 100644 index 00000000..5e21c482 --- /dev/null +++ b/util/avro-spring-cloud-schema-registry-sqs-client/src/main/java/com/jashmore/sqs/registry/AvroSchemaRegistrySqsAsyncClient.java @@ -0,0 +1,163 @@ +package com.jashmore.sqs.registry; + +import static java.util.stream.Collectors.toMap; +import static org.springframework.cloud.schema.registry.avro.AvroSchemaRegistryClientMessageConverter.AVRO_FORMAT; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import lombok.Builder; +import lombok.Value; +import lombok.experimental.Delegate; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.springframework.cloud.schema.registry.SchemaReference; +import org.springframework.cloud.schema.registry.SchemaRegistrationResponse; +import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManager; +import org.springframework.cloud.schema.registry.client.SchemaRegistryClient; +import org.springframework.core.io.Resource; +import org.springframework.util.ObjectUtils; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; +import software.amazon.awssdk.utils.SdkAutoCloseable; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Base64; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Stream; + +/** + * Helper client that is able to serialize a Pojo using Avro. + * + *

This makes sure that the schema that was used to serialize this object via the {@link SchemaRegistryClient}. + * + *

Note that this was built for testing purposes in this library only and is not meant to be production quality. + */ +@Slf4j +public class AvroSchemaRegistrySqsAsyncClient implements SqsAsyncClient { + @Delegate(excludes = SdkAutoCloseable.class) + private final SqsAsyncClient delegate; + private final SchemaRegistryClient schemaRegistryClient; + private final AvroSchemaServiceManager avroSchemaServiceManager; + private final Map, RegisteredSchema> schemaCache; + + public AvroSchemaRegistrySqsAsyncClient(final SqsAsyncClient delegate, + final SchemaRegistryClient schemaRegistryClient, + final AvroSchemaServiceManager avroSchemaServiceManager, + final List schemaImports, + final List schemaLocations) { + this.delegate = delegate; + this.schemaRegistryClient = schemaRegistryClient; + this.avroSchemaServiceManager = avroSchemaServiceManager; + this.schemaCache = parseAvailableSchemas(schemaImports, schemaLocations); + } + + /** + * Send a payload as a SQS message where it is serialized via the avro serialization process. + * + *

This uses message attributes as the content type carrier and the whole body is included in the payload of the message. + * + * @param contentTypePrefix the custom prefix that could be included in the message + * @param messageAttributeName the name of the attribute that will contain the content type + * @param payload the payload to send + * @param requestBuilder the builder that can be used to configure the request, such as configuring the URL, etc + * @param the type of the payload + * @return the future containing the response for sending the message + */ + public CompletableFuture sendAvroMessage(final String contentTypePrefix, + final String messageAttributeName, + final T payload, + final Consumer requestBuilder) { + final RegisteredSchema registeredSchema = schemaCache.get(payload.getClass()); + final SchemaReference reference = registeredSchema.reference; + + return delegate.sendMessage(builder -> builder.applyMutation(requestBuilder) + .messageAttributes(ImmutableMap.of(messageAttributeName, MessageAttributeValue.builder() + .dataType("String") + .stringValue(generateContentType(contentTypePrefix, reference)) + .build())) + .messageBody(serializeObject(payload, registeredSchema.schema))); + } + + private Map, RegisteredSchema> parseAvailableSchemas(final List schemaImports, + final List schemaLocations) { + final Schema.Parser schemaParser = new Schema.Parser(); + return Stream.of(schemaImports, schemaLocations) + .filter(arr -> !ObjectUtils.isEmpty(arr)) + .distinct() + .flatMap(List::stream) + .flatMap(resource -> { + final Schema schema; + try { + schema = schemaParser.parse(resource.getInputStream()); + } catch (IOException ioException) { + throw new RuntimeException("Error parsing schema: " + resource.getFilename(), ioException); + } + if (schema.getType().equals(Schema.Type.UNION)) { + return schema.getTypes().stream(); + } else { + return Stream.of(schema); + } + }) + .map(this::createObjectMapping) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private Map.Entry, RegisteredSchema> createObjectMapping(final Schema schema) { + final SchemaRegistrationResponse response = this.schemaRegistryClient.register( + schema.getName().toLowerCase(Locale.ENGLISH), AVRO_FORMAT, schema.toString() + ); + log.info("Schema {} registered with id {}", schema.getName(), response.getId()); + final Class clazz; + try { + clazz = Class.forName(schema.getNamespace() + "." + schema.getName()); + } catch (ClassNotFoundException classNotFoundException) { + throw new RuntimeException(classNotFoundException); + } + + return Maps.immutableEntry(clazz, RegisteredSchema.builder() + .schema(schema) + .reference(response.getSchemaReference()) + .build()); + } + + private String serializeObject(final T payload, final Schema schema) { + final Class clazz = payload.getClass(); + final DatumWriter writer = avroSchemaServiceManager.getDatumWriter(clazz, schema); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null); + try { + writer.write(payload, encoder); + encoder.flush(); + } catch (IOException ioException) { + throw new RuntimeException("Error serializing payload", ioException); + } + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + @Value + @Builder + private static class RegisteredSchema { + SchemaReference reference; + Schema schema; + } + + @Override + public void close() { + // Just needed cause lombok is odd for this class... + } + + private String generateContentType(final String prefix, final SchemaReference reference) { + return "application/" + prefix + "." + reference.getSubject() + ".v" + reference.getVersion() + "+" + reference.getFormat(); + } +} diff --git a/util/pom.xml b/util/pom.xml index c3c6a6d3..2c1124c8 100644 --- a/util/pom.xml +++ b/util/pom.xml @@ -13,6 +13,7 @@ Contains utility modules that are needed across multiple aspects of the framework + avro-spring-cloud-schema-registry-sqs-client common-utils expected-test-exception local-amazon-sqs