From d5a7b15defeae86e07613b785a61784f798a3f97 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 14 Jul 2020 15:05:39 +0200 Subject: [PATCH 1/4] #215 - Prepare issue branch. --- pom.xml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 342f0572..a7c0cc36 100644 --- a/pom.xml +++ b/pom.xml @@ -1,11 +1,13 @@ - + 4.0.0 org.springframework.data spring-data-r2dbc - 1.2.0-SNAPSHOT + 1.2.0-gh-215-SNAPSHOT Spring Data R2DBC Spring Data module for R2DBC From 16f3bb136073c9be70754f91c21b9782d06b340b Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 14 Jul 2020 15:06:22 +0200 Subject: [PATCH 2/4] #215 - Add support for EntityCallbacks. We now support entity callbacks for: * AfterConvertCallback * BeforeConvertCallback * BeforeSaveCallback * AfterSaveCallback through R2dbcEntityTemplate. --- src/main/asciidoc/new-features.adoc | 6 + .../reference/r2dbc-entity-callbacks.adoc | 38 ++++ .../reference/r2dbc-repositories.adoc | 3 + .../data/r2dbc/core/R2dbcEntityTemplate.java | 214 ++++++++++++++---- .../core/ReactiveSelectOperationSupport.java | 8 +- .../mapping/event/AfterConvertCallback.java | 42 ++++ .../mapping/event/AfterSaveCallback.java | 44 ++++ .../mapping/event/BeforeConvertCallback.java | 42 ++++ .../mapping/event/BeforeSaveCallback.java | 48 ++++ .../r2dbc/mapping/event/package-info.java | 5 + .../core/R2dbcEntityTemplateUnitTests.java | 175 ++++++++++++++ 11 files changed, 583 insertions(+), 42 deletions(-) create mode 100644 src/main/asciidoc/reference/r2dbc-entity-callbacks.adoc create mode 100644 src/main/java/org/springframework/data/r2dbc/mapping/event/AfterConvertCallback.java create mode 100644 src/main/java/org/springframework/data/r2dbc/mapping/event/AfterSaveCallback.java create mode 100644 src/main/java/org/springframework/data/r2dbc/mapping/event/BeforeConvertCallback.java create mode 100644 src/main/java/org/springframework/data/r2dbc/mapping/event/BeforeSaveCallback.java create mode 100644 src/main/java/org/springframework/data/r2dbc/mapping/event/package-info.java diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc index ae64271b..bdeff7f0 100644 --- a/src/main/asciidoc/new-features.adoc +++ b/src/main/asciidoc/new-features.adoc @@ -1,7 +1,13 @@ [[new-features]] = New & Noteworthy +[[new-features.1-2-0]] +== What's New in Spring Data R2DBC 1.2.0 + +* Support for <>. + [[new-features.1-1-0-RELEASE]] + == What's New in Spring Data R2DBC 1.1.0 RELEASE * Introduction of `R2dbcEntityTemplate` for entity-oriented operations. diff --git a/src/main/asciidoc/reference/r2dbc-entity-callbacks.adoc b/src/main/asciidoc/reference/r2dbc-entity-callbacks.adoc new file mode 100644 index 00000000..d0cac8b3 --- /dev/null +++ b/src/main/asciidoc/reference/r2dbc-entity-callbacks.adoc @@ -0,0 +1,38 @@ +[[r2dbc.entity-callbacks]] += Store specific EntityCallbacks + +Spring Data R2DBC uses the `EntityCallback` API and reacts on the following callbacks. + +.Supported Entity Callbacks +[%header,cols="4"] +|=== +| Callback +| Method +| Description +| Order + +| BeforeConvertCallback +| `onBeforeConvert(T entity, SqlIdentifier table)` +| Invoked before a domain object is converted to `OutboundRow`. +| `Ordered.LOWEST_PRECEDENCE` + +| AfterConvertCallback +| `onAfterConvert(T entity, SqlIdentifier table)` +| Invoked after a domain object is loaded. + +Can modify the domain object after reading it from a row. +| `Ordered.LOWEST_PRECEDENCE` + +| BeforeSaveCallback +| `onBeforeSave(T entity, OutboundRow row, SqlIdentifier table)` +| Invoked before a domain object is saved. + +Can modify the target, to be persisted, `OutboundRow` containing all mapped entity information. +| `Ordered.LOWEST_PRECEDENCE` + +| AfterSaveCallback +| `onAfterSave(T entity, OutboundRow row, SqlIdentifier table)` +| Invoked before a domain object is saved. + +Can modify the domain object, to be returned after save, `OutboundRow` containing all mapped entity information. +| `Ordered.LOWEST_PRECEDENCE` + +|=== + diff --git a/src/main/asciidoc/reference/r2dbc-repositories.adoc b/src/main/asciidoc/reference/r2dbc-repositories.adoc index 2b9140ba..4ac16782 100644 --- a/src/main/asciidoc/reference/r2dbc-repositories.adoc +++ b/src/main/asciidoc/reference/r2dbc-repositories.adoc @@ -336,3 +336,6 @@ With auto-increment columns, this happens automatically, because the ID gets set :projection-collection: Flux include::../{spring-data-commons-docs}/repository-projections.adoc[leveloffset=+2] +include::../{spring-data-commons-docs}/entity-callbacks.adoc[leveloffset=+1] +include::./r2dbc-entity-callbacks.adoc[leveloffset=+2] + diff --git a/src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java b/src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java index e340a58b..101be5f4 100644 --- a/src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java +++ b/src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java @@ -23,13 +23,17 @@ import java.beans.FeatureDescriptor; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; +import org.reactivestreams.Publisher; + import org.springframework.beans.BeansException; -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.core.convert.ConversionService; import org.springframework.dao.DataAccessException; import org.springframework.dao.OptimisticLockingFailureException; @@ -37,9 +41,16 @@ import org.springframework.data.mapping.IdentifierAccessor; import org.springframework.data.mapping.MappingException; import org.springframework.data.mapping.PersistentPropertyAccessor; +import org.springframework.data.mapping.callback.ReactiveEntityCallbacks; import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.projection.ProjectionInformation; import org.springframework.data.projection.SpelAwareProxyProjectionFactory; +import org.springframework.data.r2dbc.mapping.OutboundRow; +import org.springframework.data.r2dbc.mapping.SettableValue; +import org.springframework.data.r2dbc.mapping.event.AfterConvertCallback; +import org.springframework.data.r2dbc.mapping.event.AfterSaveCallback; +import org.springframework.data.r2dbc.mapping.event.BeforeConvertCallback; +import org.springframework.data.r2dbc.mapping.event.BeforeSaveCallback; import org.springframework.data.relational.core.mapping.RelationalPersistentEntity; import org.springframework.data.relational.core.mapping.RelationalPersistentProperty; import org.springframework.data.relational.core.query.Criteria; @@ -51,6 +62,7 @@ import org.springframework.data.relational.core.sql.SqlIdentifier; import org.springframework.data.relational.core.sql.Table; import org.springframework.data.util.ProxyUtils; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -65,7 +77,7 @@ * @author Bogdan Ilchyshyn * @since 1.1 */ -public class R2dbcEntityTemplate implements R2dbcEntityOperations, BeanFactoryAware { +public class R2dbcEntityTemplate implements R2dbcEntityOperations, ApplicationContextAware { private final DatabaseClient databaseClient; @@ -75,6 +87,8 @@ public class R2dbcEntityTemplate implements R2dbcEntityOperations, BeanFactoryAw private final SpelAwareProxyProjectionFactory projectionFactory; + private @Nullable ReactiveEntityCallbacks entityCallbacks; + /** * Create a new {@link R2dbcEntityTemplate} given {@link DatabaseClient}. * @@ -111,11 +125,34 @@ public DatabaseClient getDatabaseClient() { /* * (non-Javadoc) - * @see org.springframework.beans.factory.BeanFactoryAware#setBeanFactory(org.springframework.beans.factory.BeanFactory) + * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override - public void setBeanFactory(BeanFactory beanFactory) throws BeansException { - this.projectionFactory.setBeanFactory(beanFactory); + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + + if (entityCallbacks == null) { + setEntityCallbacks(ReactiveEntityCallbacks.create(applicationContext)); + } + + projectionFactory.setBeanFactory(applicationContext); + projectionFactory.setBeanClassLoader(applicationContext.getClassLoader()); + } + + /** + * Set the {@link ReactiveEntityCallbacks} instance to use when invoking + * {@link org.springframework.data.mapping.callback.ReactiveEntityCallbacks callbacks} like the + * {@link BeforeSaveCallback}. + *

+ * Overrides potentially existing {@link ReactiveEntityCallbacks}. + * + * @param entityCallbacks must not be {@literal null}. + * @throws IllegalArgumentException if the given instance is {@literal null}. + * @since 1.2 + */ + public void setEntityCallbacks(ReactiveEntityCallbacks entityCallbacks) { + + Assert.notNull(entityCallbacks, "EntityCallbacks must not be null!"); + this.entityCallbacks = entityCallbacks; } // ------------------------------------------------------------------------- @@ -248,10 +285,27 @@ public Flux select(Query query, Class entityClass) throws DataAccessEx Assert.notNull(query, "Query must not be null"); Assert.notNull(entityClass, "entity class must not be null"); - return doSelect(query, entityClass, getTableName(entityClass), entityClass).all(); + SqlIdentifier tableName = getTableName(entityClass); + return doSelect(query, entityClass, tableName, entityClass, RowsFetchSpec::all); + } + + @SuppressWarnings("unchecked") + > P doSelect(Query query, Class entityClass, SqlIdentifier tableName, + Class returnType, Function, P> resultHandler) { + + RowsFetchSpec fetchSpec = doSelect(query, entityClass, tableName, returnType); + + P result = resultHandler.apply(fetchSpec); + + if (result instanceof Mono) { + return (P) ((Mono) result).flatMap(it -> maybeCallAfterConvert(it, tableName)); + } + + return (P) ((Flux) result).flatMap(it -> maybeCallAfterConvert(it, tableName)); } - RowsFetchSpec doSelect(Query query, Class entityClass, SqlIdentifier tableName, Class returnType) { + private RowsFetchSpec doSelect(Query query, Class entityClass, SqlIdentifier tableName, + Class returnType) { StatementMapper statementMapper = dataAccessStrategy.getStatementMapper().forType(entityClass); @@ -295,7 +349,7 @@ RowsFetchSpec doSelect(Query query, Class entityClass, SqlIdentifier t */ @Override public Mono selectOne(Query query, Class entityClass) throws DataAccessException { - return doSelect(query.limit(2), entityClass, getTableName(entityClass), entityClass).one(); + return doSelect(query.limit(2), entityClass, getTableName(entityClass), entityClass, RowsFetchSpec::one); } /* @@ -377,14 +431,33 @@ Mono doInsert(T entity, SqlIdentifier tableName) { RelationalPersistentEntity persistentEntity = getRequiredEntity(entity); - T entityToInsert = setVersionIfNecessary(persistentEntity, entity); + return Mono.defer(() -> maybeCallBeforeConvert(setVersionIfNecessary(persistentEntity, entity), tableName) + .flatMap(beforeConvert -> { - return this.databaseClient.insert() // - .into(persistentEntity.getType()) // - .table(tableName).using(entityToInsert) // - .map(this.dataAccessStrategy.getConverter().populateIdIfNecessary(entityToInsert)) // - .first() // - .defaultIfEmpty(entityToInsert); + OutboundRow outboundRow = dataAccessStrategy.getOutboundRow(beforeConvert); + + return maybeCallBeforeSave(beforeConvert, outboundRow, tableName).flatMap(entityToSave -> { + + StatementMapper mapper = dataAccessStrategy.getStatementMapper(); + StatementMapper.InsertSpec insert = mapper.createInsert(tableName); + + for (SqlIdentifier column : outboundRow.keySet()) { + SettableValue settableValue = outboundRow.get(column); + if (settableValue.hasValue()) { + insert = insert.withColumn(column, settableValue); + } + } + + PreparedOperation operation = mapper.getMappedObject(insert); + + return this.databaseClient.execute(operation) // + .filter(statement -> statement.returnGeneratedValues()) + .map(this.dataAccessStrategy.getConverter().populateIdIfNecessary(entityToSave)) // + .first() // + .defaultIfEmpty(entityToSave) // + .flatMap(saved -> maybeCallAfterSave(saved, outboundRow, tableName)); + }); + })); } @SuppressWarnings("unchecked") @@ -413,37 +486,62 @@ public Mono update(T entity) throws DataAccessException { Assert.notNull(entity, "Entity must not be null"); + return doUpdate(entity, getRequiredEntity(entity).getTableName()); + } + + private Mono doUpdate(T entity, SqlIdentifier tableName) { + RelationalPersistentEntity persistentEntity = getRequiredEntity(entity); - DatabaseClient.TypedUpdateSpec updateMatchingSpec = this.databaseClient.update() // - .table(persistentEntity.getType()) // - .table(persistentEntity.getTableName()); + return maybeCallBeforeConvert(entity, tableName).flatMap(beforeConvert -> { - DatabaseClient.UpdateSpec matching; - T entityToUpdate; - if (persistentEntity.hasVersionProperty()) { + OutboundRow outboundRow = dataAccessStrategy.getOutboundRow(entity); - Criteria criteria = createMatchingVersionCriteria(entity, persistentEntity); - entityToUpdate = incrementVersion(persistentEntity, entity); - matching = updateMatchingSpec.using(entityToUpdate).matching(criteria); - } else { - entityToUpdate = entity; - matching = updateMatchingSpec.using(entity); - } + return maybeCallBeforeSave(beforeConvert, outboundRow, tableName) // + .flatMap(entityToSave -> { - return matching.fetch() // - .rowsUpdated() // - .flatMap(rowsUpdated -> rowsUpdated == 0 ? handleMissingUpdate(entityToUpdate, persistentEntity) - : Mono.just(entityToUpdate)); - } + SqlIdentifier idColumn = persistentEntity.getRequiredIdProperty().getColumnName(); + SettableValue id = outboundRow.remove(idColumn); + Criteria criteria = Criteria.where(dataAccessStrategy.toSql(idColumn)).is(id); + + T saved; + + if (persistentEntity.hasVersionProperty()) { + criteria = criteria.and(createMatchingVersionCriteria(entity, persistentEntity)); + saved = incrementVersion(persistentEntity, entity, outboundRow); + } else { + saved = entityToSave; + } + + Update update = Update.from((Map) outboundRow); - private Mono handleMissingUpdate(T entity, RelationalPersistentEntity persistentEntity) { + StatementMapper mapper = dataAccessStrategy.getStatementMapper(); + StatementMapper.UpdateSpec updateSpec = mapper.createUpdate(tableName, update).withCriteria(criteria); - return Mono.error(persistentEntity.hasVersionProperty() - ? new OptimisticLockingFailureException(formatOptimisticLockingExceptionMessage(entity, persistentEntity)) - : new TransientDataAccessResourceException(formatTransientEntityExceptionMessage(entity, persistentEntity))); + PreparedOperation operation = mapper.getMappedObject(updateSpec); + + return this.databaseClient.execute(operation) // + .fetch() // + .rowsUpdated() // + .handle((rowsUpdated, sink) -> { + + if (rowsUpdated != 0) { + return; + } + + if (persistentEntity.hasVersionProperty()) { + sink.error(new OptimisticLockingFailureException( + formatOptimisticLockingExceptionMessage(saved, persistentEntity))); + } else { + sink.error(new TransientDataAccessResourceException( + formatTransientEntityExceptionMessage(saved, persistentEntity))); + } + }).then(maybeCallAfterSave(saved, outboundRow, tableName)); + }); + }); } + private String formatOptimisticLockingExceptionMessage(T entity, RelationalPersistentEntity persistentEntity) { return String.format("Failed to update table [%s]. Version does not match for row with Id [%s].", @@ -457,7 +555,7 @@ private String formatTransientEntityExceptionMessage(T entity, RelationalPer } @SuppressWarnings("unchecked") - private T incrementVersion(RelationalPersistentEntity persistentEntity, T entity) { + private T incrementVersion(RelationalPersistentEntity persistentEntity, T entity, OutboundRow outboundRow) { PersistentPropertyAccessor propertyAccessor = persistentEntity.getPropertyAccessor(entity); RelationalPersistentProperty versionProperty = persistentEntity.getVersionProperty(); @@ -471,6 +569,8 @@ private T incrementVersion(RelationalPersistentEntity persistentEntity, T Class versionPropertyType = versionProperty.getType(); propertyAccessor.setProperty(versionProperty, conversionService.convert(newVersionValue, versionPropertyType)); + outboundRow.put(versionProperty.getColumnName(), SettableValue.from(newVersionValue)); + return (T) propertyAccessor.getBean(); } @@ -502,6 +602,42 @@ public Mono delete(T entity) throws DataAccessException { return delete(getByIdQuery(entity, persistentEntity), persistentEntity.getType()).thenReturn(entity); } + protected Mono maybeCallBeforeConvert(T object, SqlIdentifier table) { + + if (entityCallbacks != null) { + return entityCallbacks.callback(BeforeConvertCallback.class, object, table); + } + + return Mono.just(object); + } + + protected Mono maybeCallBeforeSave(T object, OutboundRow row, SqlIdentifier table) { + + if (entityCallbacks != null) { + return entityCallbacks.callback(BeforeSaveCallback.class, object, row, table); + } + + return Mono.just(object); + } + + protected Mono maybeCallAfterSave(T object, OutboundRow row, SqlIdentifier table) { + + if (entityCallbacks != null) { + return entityCallbacks.callback(AfterSaveCallback.class, object, row, table); + } + + return Mono.just(object); + } + + protected Mono maybeCallAfterConvert(T object, SqlIdentifier table) { + + if (entityCallbacks != null) { + return entityCallbacks.callback(AfterConvertCallback.class, object, table); + } + + return Mono.just(object); + } + private Query getByIdQuery(T entity, RelationalPersistentEntity persistentEntity) { if (!persistentEntity.hasIdProperty()) { throw new MappingException("No id property found for object of type " + persistentEntity.getType() + "!"); diff --git a/src/main/java/org/springframework/data/r2dbc/core/ReactiveSelectOperationSupport.java b/src/main/java/org/springframework/data/r2dbc/core/ReactiveSelectOperationSupport.java index a7a05b3b..c86f33fb 100644 --- a/src/main/java/org/springframework/data/r2dbc/core/ReactiveSelectOperationSupport.java +++ b/src/main/java/org/springframework/data/r2dbc/core/ReactiveSelectOperationSupport.java @@ -127,7 +127,8 @@ public Mono exists() { */ @Override public Mono first() { - return this.template.doSelect(this.query.limit(1), this.domainType, getTableName(), this.returnType).first(); + return this.template.doSelect(this.query.limit(1), this.domainType, getTableName(), this.returnType, + RowsFetchSpec::first); } /* @@ -136,7 +137,8 @@ public Mono first() { */ @Override public Mono one() { - return this.template.doSelect(this.query.limit(2), this.domainType, getTableName(), this.returnType).one(); + return this.template.doSelect(this.query.limit(2), this.domainType, getTableName(), this.returnType, + RowsFetchSpec::one); } /* @@ -145,7 +147,7 @@ public Mono one() { */ @Override public Flux all() { - return this.template.doSelect(this.query, this.domainType, getTableName(), this.returnType).all(); + return this.template.doSelect(this.query, this.domainType, getTableName(), this.returnType, RowsFetchSpec::all); } private SqlIdentifier getTableName() { diff --git a/src/main/java/org/springframework/data/r2dbc/mapping/event/AfterConvertCallback.java b/src/main/java/org/springframework/data/r2dbc/mapping/event/AfterConvertCallback.java new file mode 100644 index 00000000..25890525 --- /dev/null +++ b/src/main/java/org/springframework/data/r2dbc/mapping/event/AfterConvertCallback.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.r2dbc.mapping.event; + +import org.reactivestreams.Publisher; + +import org.springframework.data.mapping.callback.EntityCallback; +import org.springframework.data.relational.core.sql.SqlIdentifier; + +/** + * Callback being invoked after a domain object is materialized from a row when reading results. + * + * @author Mark Paluch + * @since 1.2 + * @see org.springframework.data.mapping.callback.ReactiveEntityCallbacks + */ +@FunctionalInterface +public interface AfterConvertCallback extends EntityCallback { + + /** + * Entity callback method invoked after a domain object is materialized from a row. Can return either the same or a + * modified instance of the domain object. + * + * @param entity the domain object (the result of the conversion). + * @param table name of the table. + * @return the domain object that is the result of reading it from a row. + */ + Publisher onAfterConvert(T entity, SqlIdentifier table); +} diff --git a/src/main/java/org/springframework/data/r2dbc/mapping/event/AfterSaveCallback.java b/src/main/java/org/springframework/data/r2dbc/mapping/event/AfterSaveCallback.java new file mode 100644 index 00000000..a8c58c89 --- /dev/null +++ b/src/main/java/org/springframework/data/r2dbc/mapping/event/AfterSaveCallback.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.r2dbc.mapping.event; + +import org.reactivestreams.Publisher; + +import org.springframework.data.mapping.callback.EntityCallback; +import org.springframework.data.r2dbc.mapping.OutboundRow; +import org.springframework.data.relational.core.sql.SqlIdentifier; + +/** + * Entity callback triggered after save of a {@link OutboundRow}. + * + * @author Mark Paluch + * @since 1.2 + * @see org.springframework.data.mapping.callback.ReactiveEntityCallbacks + */ +@FunctionalInterface +public interface AfterSaveCallback extends EntityCallback { + + /** + * Entity callback method invoked after a domain object is saved. Can return either the same or a modified instance of + * the domain object. + * + * @param entity the domain object that was saved. + * @param outboundRow {@link OutboundRow} representing the {@code entity}. + * @param table name of the table. + * @return the domain object that was persisted. + */ + Publisher onAfterSave(T entity, OutboundRow outboundRow, SqlIdentifier table); +} diff --git a/src/main/java/org/springframework/data/r2dbc/mapping/event/BeforeConvertCallback.java b/src/main/java/org/springframework/data/r2dbc/mapping/event/BeforeConvertCallback.java new file mode 100644 index 00000000..e251b8cb --- /dev/null +++ b/src/main/java/org/springframework/data/r2dbc/mapping/event/BeforeConvertCallback.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.r2dbc.mapping.event; + +import org.reactivestreams.Publisher; + +import org.springframework.data.mapping.callback.EntityCallback; +import org.springframework.data.relational.core.sql.SqlIdentifier; + +/** + * Callback being invoked before a domain object is converted to be persisted. + * + * @author Mark Paluch + * @since 1.2 + * @see org.springframework.data.mapping.callback.ReactiveEntityCallbacks + */ +@FunctionalInterface +public interface BeforeConvertCallback extends EntityCallback { + + /** + * Entity callback method invoked before a domain object is converted to be persisted. Can return either the same or a + * modified instance of the domain object. + * + * @param entity the domain object to save. + * @param table name of the table. + * @return the domain object to be persisted. + */ + Publisher onBeforeConvert(T entity, SqlIdentifier table); +} diff --git a/src/main/java/org/springframework/data/r2dbc/mapping/event/BeforeSaveCallback.java b/src/main/java/org/springframework/data/r2dbc/mapping/event/BeforeSaveCallback.java new file mode 100644 index 00000000..c522a802 --- /dev/null +++ b/src/main/java/org/springframework/data/r2dbc/mapping/event/BeforeSaveCallback.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.r2dbc.mapping.event; + +import org.reactivestreams.Publisher; + +import org.springframework.data.mapping.callback.EntityCallback; +import org.springframework.data.r2dbc.mapping.OutboundRow; +import org.springframework.data.relational.core.sql.SqlIdentifier; + +/** + * Entity callback triggered before save of a row. + * + * @author Mark Paluch + * @since 1.2 + * @see org.springframework.data.mapping.callback.ReactiveEntityCallbacks + */ +@FunctionalInterface +public interface BeforeSaveCallback extends EntityCallback { + + /** + * Entity callback method invoked before a domain object is saved. Can return either the same or a modified instance + * of the domain object and can modify {@link OutboundRow} contents. This method is called after converting the + * {@code entity} to a {@link OutboundRow} so effectively the row is used as outcome of invoking this callback. + * Changes to the domain object are not taken into account for saving, only changes to the row. Only transient fields + * of the entity should be changed in this callback. To change persistent the entity before being converted, use the + * {@link BeforeConvertCallback}. + * + * @param entity the domain object to save. + * @param row {@link OutboundRow} representing the {@code entity}. + * @param table name of the table. + * @return the domain object to be persisted. + */ + Publisher onBeforeSave(T entity, OutboundRow row, SqlIdentifier table); +} diff --git a/src/main/java/org/springframework/data/r2dbc/mapping/event/package-info.java b/src/main/java/org/springframework/data/r2dbc/mapping/event/package-info.java new file mode 100644 index 00000000..94127442 --- /dev/null +++ b/src/main/java/org/springframework/data/r2dbc/mapping/event/package-info.java @@ -0,0 +1,5 @@ +/** + * Mapping event callback infrastructure for the R2DBC row-to-object mapping subsystem. + */ +@org.springframework.lang.NonNullApi +package org.springframework.data.r2dbc.mapping.event; diff --git a/src/test/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplateUnitTests.java b/src/test/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplateUnitTests.java index fa47fbef..0dfc007e 100644 --- a/src/test/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplateUnitTests.java +++ b/src/test/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplateUnitTests.java @@ -21,11 +21,15 @@ import io.r2dbc.spi.test.MockResult; import io.r2dbc.spi.test.MockRow; import io.r2dbc.spi.test.MockRowMetadata; +import lombok.ToString; import lombok.Value; import lombok.With; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import org.junit.Before; import org.junit.Test; @@ -33,13 +37,22 @@ import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Version; import org.springframework.data.domain.Sort; +import org.springframework.data.mapping.callback.ReactiveEntityCallbacks; import org.springframework.data.r2dbc.dialect.PostgresDialect; +import org.springframework.data.r2dbc.mapping.OutboundRow; import org.springframework.data.r2dbc.mapping.SettableValue; +import org.springframework.data.r2dbc.mapping.event.AfterConvertCallback; +import org.springframework.data.r2dbc.mapping.event.AfterSaveCallback; +import org.springframework.data.r2dbc.mapping.event.BeforeConvertCallback; +import org.springframework.data.r2dbc.mapping.event.BeforeSaveCallback; import org.springframework.data.r2dbc.testing.StatementRecorder; import org.springframework.data.relational.core.mapping.Column; import org.springframework.data.relational.core.query.Criteria; import org.springframework.data.relational.core.query.Query; import org.springframework.data.relational.core.query.Update; +import org.springframework.data.relational.core.sql.SqlIdentifier; +import org.springframework.lang.Nullable; +import org.springframework.util.CollectionUtils; /** * Unit tests for {@link R2dbcEntityTemplate}. @@ -119,6 +132,31 @@ public void shouldSelectByCriteria() { assertThat(statement.getBindings()).hasSize(1).containsEntry(0, SettableValue.from("Walter")); } + @Test // gh-215 + public void selectShouldInvokeCallback() { + + MockRowMetadata metadata = MockRowMetadata.builder().columnMetadata(MockColumnMetadata.builder().name("id").build()) + .columnMetadata(MockColumnMetadata.builder().name("THE_NAME").build()).build(); + MockResult result = MockResult.builder().rowMetadata(metadata).row(MockRow.builder() + .identified("id", Object.class, "Walter").identified("THE_NAME", Object.class, "some-name").build()).build(); + + recorder.addStubbing(s -> s.startsWith("SELECT"), result); + + ValueCapturingAfterConvertCallback callback = new ValueCapturingAfterConvertCallback(); + + entityTemplate.setEntityCallbacks(ReactiveEntityCallbacks.create(callback)); + + entityTemplate.select(Query.empty(), Person.class) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + assertThat(actual.id).isEqualTo("after-convert"); + assertThat(actual.name).isEqualTo("some-name"); + }).verifyComplete(); + + assertThat(callback.getValues()).hasSize(1); + } + @Test // gh-220 public void shouldSelectOne() { @@ -215,6 +253,34 @@ public void shouldInsertVersioned() { SettableValue.from(1L)); } + @Test // gh-215 + public void insertShouldInvokeCallback() { + + MockRowMetadata metadata = MockRowMetadata.builder().build(); + MockResult result = MockResult.builder().rowMetadata(metadata).rowsUpdated(1).build(); + + recorder.addStubbing(s -> s.startsWith("INSERT"), result); + + ValueCapturingBeforeConvertCallback beforeConvert = new ValueCapturingBeforeConvertCallback(); + ValueCapturingBeforeSaveCallback beforeSave = new ValueCapturingBeforeSaveCallback(); + ValueCapturingAfterSaveCallback afterSave = new ValueCapturingAfterSaveCallback(); + + entityTemplate.setEntityCallbacks(ReactiveEntityCallbacks.create(beforeConvert, beforeSave, afterSave)); + entityTemplate.insert(new Person()).as(StepVerifier::create) // + .assertNext(actual -> { + assertThat(actual.id).isEqualTo("after-save"); + assertThat(actual.name).isEqualTo("before-convert"); + assertThat(actual.description).isNull(); + }) // + .verifyComplete(); + + StatementRecorder.RecordedStatement statement = recorder.getCreatedStatement(s -> s.startsWith("INSERT")); + + assertThat(statement.getSql()).isEqualTo("INSERT INTO person (THE_NAME, description) VALUES ($1, $2)"); + assertThat(statement.getBindings()).hasSize(2).containsEntry(0, SettableValue.from("before-convert")) + .containsEntry(1, SettableValue.from("before-save")); + } + @Test // gh-365 public void shouldUpdateVersioned() { @@ -237,12 +303,48 @@ public void shouldUpdateVersioned() { SettableValue.from(1L)); } + @Test // gh-215 + public void updateShouldInvokeCallback() { + + MockRowMetadata metadata = MockRowMetadata.builder().build(); + MockResult result = MockResult.builder().rowMetadata(metadata).rowsUpdated(1).build(); + + recorder.addStubbing(s -> s.startsWith("UPDATE"), result); + + ValueCapturingBeforeConvertCallback beforeConvert = new ValueCapturingBeforeConvertCallback(); + ValueCapturingBeforeSaveCallback beforeSave = new ValueCapturingBeforeSaveCallback(); + ValueCapturingAfterSaveCallback afterSave = new ValueCapturingAfterSaveCallback(); + + Person person = new Person(); + person.id = "the-id"; + person.name = "name"; + person.description = "description"; + + entityTemplate.setEntityCallbacks(ReactiveEntityCallbacks.create(beforeConvert, beforeSave, afterSave)); + entityTemplate.update(person).as(StepVerifier::create) // + .assertNext(actual -> { + assertThat(actual.id).isEqualTo("after-save"); + assertThat(actual.name).isEqualTo("before-convert"); + assertThat(actual.description).isNull(); + }) // + .verifyComplete(); + + StatementRecorder.RecordedStatement statement = recorder.getCreatedStatement(s -> s.startsWith("UPDATE")); + + assertThat(statement.getSql()).isEqualTo("UPDATE person SET THE_NAME = $1, description = $2 WHERE person.id = $3"); + assertThat(statement.getBindings()).hasSize(3).containsEntry(0, SettableValue.from("before-convert")) + .containsEntry(1, SettableValue.from("before-save")); + } + + @ToString static class Person { @Id String id; @Column("THE_NAME") String name; + String description; + public String getName() { return name; } @@ -262,4 +364,77 @@ static class VersionedPerson { String name; } + + static class ValueCapturingEntityCallback { + + private final List values = new ArrayList<>(1); + + protected void capture(T value) { + values.add(value); + } + + public List getValues() { + return values; + } + + @Nullable + public T getValue() { + return CollectionUtils.lastElement(values); + } + } + + static class ValueCapturingBeforeConvertCallback extends ValueCapturingEntityCallback + implements BeforeConvertCallback { + + @Override + public Mono onBeforeConvert(Person entity, SqlIdentifier table) { + + capture(entity); + entity.name = "before-convert"; + return Mono.just(entity); + } + } + + static class ValueCapturingBeforeSaveCallback extends ValueCapturingEntityCallback + implements BeforeSaveCallback { + + @Override + public Mono onBeforeSave(Person entity, OutboundRow outboundRow, SqlIdentifier table) { + + capture(entity); + outboundRow.put(SqlIdentifier.unquoted("description"), SettableValue.from("before-save")); + return Mono.just(entity); + } + } + + static class ValueCapturingAfterSaveCallback extends ValueCapturingEntityCallback + implements AfterSaveCallback { + + @Override + public Mono onAfterSave(Person entity, OutboundRow outboundRow, SqlIdentifier table) { + + capture(entity); + + Person person = new Person(); + person.id = "after-save"; + person.name = entity.name; + + return Mono.just(person); + } + } + + static class ValueCapturingAfterConvertCallback extends ValueCapturingEntityCallback + implements AfterConvertCallback { + + @Override + public Mono onAfterConvert(Person entity, SqlIdentifier table) { + + capture(entity); + Person person = new Person(); + person.id = "after-convert"; + person.name = entity.name; + + return Mono.just(person); + } + } } From 47c26266398f886d4c0afefac7529c451380dd86 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 14 Jul 2020 15:07:14 +0200 Subject: [PATCH 3/4] #215 - Polishing. Simplify what's new section. --- src/main/asciidoc/new-features.adoc | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc index bdeff7f0..a599f947 100644 --- a/src/main/asciidoc/new-features.adoc +++ b/src/main/asciidoc/new-features.adoc @@ -6,42 +6,29 @@ * Support for <>. -[[new-features.1-1-0-RELEASE]] - -== What's New in Spring Data R2DBC 1.1.0 RELEASE +[[new-features.1-1-0]] +== What's New in Spring Data R2DBC 1.1.0 * Introduction of `R2dbcEntityTemplate` for entity-oriented operations. * <>. * Support interface projections with `DatabaseClient.as(…)`. * <>. -[[new-features.1-0-0-RELEASE]] -== What's New in Spring Data R2DBC 1.0.0 RELEASE +[[new-features.1-0-0]] +== What's New in Spring Data R2DBC 1.0.0 * Upgrade to R2DBC 0.8.0.RELEASE. * `@Modifying` annotation for query methods to consume affected row count. * Repository `save(…)` with an associated Id terminates with `TransientDataAccessException` if the row does not exist in the database. * Added `SingleConnectionConnectionFactory` for testing using connection singletons. * Support for {spring-framework-ref}/core.html#expressions[SpEL expressions] in `@Query`. - -[[new-features.1-0-0-RC1]] -== What's New in Spring Data R2DBC 1.0.0 RC1 - * `ConnectionFactory` routing through `AbstractRoutingConnectionFactory`. * Utilities for schema initialization through `ResourceDatabasePopulator` and `ScriptUtils`. * Propagation and reset of Auto-Commit and Isolation Level control through `TransactionDefinition`. * Support for Entity-level converters. * Kotlin extensions for reified generics and <>. * Add pluggable mechanism to register dialects. - -[[new-features.1-0-0-M2]] -== What's New in Spring Data R2DBC 1.0.0 M2 - * Support for named parameters. - -[[new-features.1-0-0-M1]] -== What's New in Spring Data R2DBC 1.0.0 M1 - * Initial R2DBC support through `DatabaseClient`. * Initial Transaction support through `TransactionalDatabaseClient`. * Initial R2DBC Repository Support through `R2dbcRepository`. From fe313a975bb66ac5e48b070af9618d4002f637c0 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 17 Jul 2020 15:09:36 +0200 Subject: [PATCH 4/4] #215 - Polishing. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reintroduce deprecated setBeanFactory(…) method. Extract code into methods. Ensure that versioned entities are eagerly initialized to allow retries. --- .../data/r2dbc/core/R2dbcEntityTemplate.java | 136 +++++++++++------- 1 file changed, 82 insertions(+), 54 deletions(-) diff --git a/src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java b/src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java index 101be5f4..e58d7c63 100644 --- a/src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java +++ b/src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java @@ -32,6 +32,8 @@ import org.reactivestreams.Publisher; import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.core.convert.ConversionService; @@ -77,7 +79,7 @@ * @author Bogdan Ilchyshyn * @since 1.1 */ -public class R2dbcEntityTemplate implements R2dbcEntityOperations, ApplicationContextAware { +public class R2dbcEntityTemplate implements R2dbcEntityOperations, BeanFactoryAware, ApplicationContextAware { private final DatabaseClient databaseClient; @@ -123,6 +125,15 @@ public DatabaseClient getDatabaseClient() { return this.databaseClient; } + /* + * (non-Javadoc) + * @see org.springframework.beans.factory.BeanFactoryAware#setBeanFactory(org.springframework.beans.factory.BeanFactory) + * @deprecated since 1.2 in favor of #setApplicationContext. + */ + @Override + @Deprecated + public void setBeanFactory(BeanFactory beanFactory) throws BeansException {} + /* * (non-Javadoc) * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) @@ -431,33 +442,38 @@ Mono doInsert(T entity, SqlIdentifier tableName) { RelationalPersistentEntity persistentEntity = getRequiredEntity(entity); - return Mono.defer(() -> maybeCallBeforeConvert(setVersionIfNecessary(persistentEntity, entity), tableName) + T entityWithVersion = setVersionIfNecessary(persistentEntity, entity); + + return maybeCallBeforeConvert(entityWithVersion, tableName) .flatMap(beforeConvert -> { OutboundRow outboundRow = dataAccessStrategy.getOutboundRow(beforeConvert); - return maybeCallBeforeSave(beforeConvert, outboundRow, tableName).flatMap(entityToSave -> { + return maybeCallBeforeSave(beforeConvert, outboundRow, tableName) // + .flatMap(entityToSave -> doInsert(entityToSave, tableName, outboundRow)); + }); + } + + private Mono doInsert(T entity, SqlIdentifier tableName, OutboundRow outboundRow) { - StatementMapper mapper = dataAccessStrategy.getStatementMapper(); - StatementMapper.InsertSpec insert = mapper.createInsert(tableName); + StatementMapper mapper = dataAccessStrategy.getStatementMapper(); + StatementMapper.InsertSpec insert = mapper.createInsert(tableName); - for (SqlIdentifier column : outboundRow.keySet()) { - SettableValue settableValue = outboundRow.get(column); - if (settableValue.hasValue()) { - insert = insert.withColumn(column, settableValue); - } - } + for (SqlIdentifier column : outboundRow.keySet()) { + SettableValue settableValue = outboundRow.get(column); + if (settableValue.hasValue()) { + insert = insert.withColumn(column, settableValue); + } + } - PreparedOperation operation = mapper.getMappedObject(insert); + PreparedOperation operation = mapper.getMappedObject(insert); - return this.databaseClient.execute(operation) // - .filter(statement -> statement.returnGeneratedValues()) - .map(this.dataAccessStrategy.getConverter().populateIdIfNecessary(entityToSave)) // - .first() // - .defaultIfEmpty(entityToSave) // - .flatMap(saved -> maybeCallAfterSave(saved, outboundRow, tableName)); - }); - })); + return this.databaseClient.execute(operation) // + .filter(statement -> statement.returnGeneratedValues()) + .map(this.dataAccessStrategy.getConverter().populateIdIfNecessary(entity)) // + .first() // + .defaultIfEmpty(entity) // + .flatMap(saved -> maybeCallAfterSave(saved, outboundRow, tableName)); } @SuppressWarnings("unchecked") @@ -493,9 +509,22 @@ private Mono doUpdate(T entity, SqlIdentifier tableName) { RelationalPersistentEntity persistentEntity = getRequiredEntity(entity); - return maybeCallBeforeConvert(entity, tableName).flatMap(beforeConvert -> { + T entityToUse; + Criteria matchingVersionCriteria; + + if (persistentEntity.hasVersionProperty()) { - OutboundRow outboundRow = dataAccessStrategy.getOutboundRow(entity); + matchingVersionCriteria = createMatchingVersionCriteria(entity, persistentEntity); + entityToUse = incrementVersion(persistentEntity, entity); + } else { + + entityToUse = entity; + matchingVersionCriteria = null; + } + + return maybeCallBeforeConvert(entityToUse, tableName).flatMap(beforeConvert -> { + + OutboundRow outboundRow = dataAccessStrategy.getOutboundRow(beforeConvert); return maybeCallBeforeSave(beforeConvert, outboundRow, tableName) // .flatMap(entityToSave -> { @@ -504,43 +533,44 @@ private Mono doUpdate(T entity, SqlIdentifier tableName) { SettableValue id = outboundRow.remove(idColumn); Criteria criteria = Criteria.where(dataAccessStrategy.toSql(idColumn)).is(id); - T saved; - - if (persistentEntity.hasVersionProperty()) { - criteria = criteria.and(createMatchingVersionCriteria(entity, persistentEntity)); - saved = incrementVersion(persistentEntity, entity, outboundRow); - } else { - saved = entityToSave; + if (matchingVersionCriteria != null) { + criteria = criteria.and(matchingVersionCriteria); } - Update update = Update.from((Map) outboundRow); + return doUpdate(entityToSave, tableName, persistentEntity, criteria, outboundRow); + }); + }); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private Mono doUpdate(T entity, SqlIdentifier tableName, RelationalPersistentEntity persistentEntity, + Criteria criteria, OutboundRow outboundRow) { - StatementMapper mapper = dataAccessStrategy.getStatementMapper(); - StatementMapper.UpdateSpec updateSpec = mapper.createUpdate(tableName, update).withCriteria(criteria); + Update update = Update.from((Map) outboundRow); - PreparedOperation operation = mapper.getMappedObject(updateSpec); + StatementMapper mapper = dataAccessStrategy.getStatementMapper(); + StatementMapper.UpdateSpec updateSpec = mapper.createUpdate(tableName, update).withCriteria(criteria); - return this.databaseClient.execute(operation) // - .fetch() // - .rowsUpdated() // - .handle((rowsUpdated, sink) -> { + PreparedOperation operation = mapper.getMappedObject(updateSpec); - if (rowsUpdated != 0) { - return; - } + return this.databaseClient.execute(operation) // + .fetch() // + .rowsUpdated() // + .handle((rowsUpdated, sink) -> { - if (persistentEntity.hasVersionProperty()) { - sink.error(new OptimisticLockingFailureException( - formatOptimisticLockingExceptionMessage(saved, persistentEntity))); - } else { - sink.error(new TransientDataAccessResourceException( - formatTransientEntityExceptionMessage(saved, persistentEntity))); - } - }).then(maybeCallAfterSave(saved, outboundRow, tableName)); - }); - }); - } + if (rowsUpdated != 0) { + return; + } + if (persistentEntity.hasVersionProperty()) { + sink.error(new OptimisticLockingFailureException( + formatOptimisticLockingExceptionMessage(entity, persistentEntity))); + } else { + sink.error(new TransientDataAccessResourceException( + formatTransientEntityExceptionMessage(entity, persistentEntity))); + } + }).then(maybeCallAfterSave(entity, outboundRow, tableName)); + } private String formatOptimisticLockingExceptionMessage(T entity, RelationalPersistentEntity persistentEntity) { @@ -555,7 +585,7 @@ private String formatTransientEntityExceptionMessage(T entity, RelationalPer } @SuppressWarnings("unchecked") - private T incrementVersion(RelationalPersistentEntity persistentEntity, T entity, OutboundRow outboundRow) { + private T incrementVersion(RelationalPersistentEntity persistentEntity, T entity) { PersistentPropertyAccessor propertyAccessor = persistentEntity.getPropertyAccessor(entity); RelationalPersistentProperty versionProperty = persistentEntity.getVersionProperty(); @@ -569,8 +599,6 @@ private T incrementVersion(RelationalPersistentEntity persistentEntity, T Class versionPropertyType = versionProperty.getType(); propertyAccessor.setProperty(versionProperty, conversionService.convert(newVersionValue, versionPropertyType)); - outboundRow.put(versionProperty.getColumnName(), SettableValue.from(newVersionValue)); - return (T) propertyAccessor.getBean(); }