Skip to content

Batch non-root deletes across aggregates via #deleteAll #1230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ public interface JdbcAggregateOperations {
*/
<T> void deleteById(Object id, Class<T> domainType);

/**
* Deletes all aggregates identified by their aggregate root ids.
*
* @param ids the ids of the aggregate roots of the aggregates to be deleted. Must not be {@code null}.
* @param domainType the type of the aggregate root.
* @param <T> the type of the aggregate root.
*/
<T> void deleteAllById(Iterable<?> ids, Class<T> domainType);

/**
* Delete an aggregate identified by it's aggregate root.
*
Expand All @@ -96,6 +105,15 @@ public interface JdbcAggregateOperations {
*/
void deleteAll(Class<?> domainType);

/**
* Delete all aggregates identified by their aggregate roots.
*
* @param aggregateRoots to delete. Must not be {@code null}.
* @param domainType type of the aggregate roots to be deleted. Must not be {@code null}.
* @param <T> the type of the aggregate roots.
*/
<T> void deleteAll(Iterable<? extends T> aggregateRoots, Class<T> domainType);

/**
* Counts the number of aggregates of a given type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand All @@ -32,13 +34,14 @@
import org.springframework.data.mapping.IdentifierAccessor;
import org.springframework.data.mapping.callback.EntityCallbacks;
import org.springframework.data.relational.core.conversion.AggregateChange;
import org.springframework.data.relational.core.conversion.RootAggregateChange;
import org.springframework.data.relational.core.conversion.BatchingAggregateChange;
import org.springframework.data.relational.core.conversion.DeleteAggregateChange;
import org.springframework.data.relational.core.conversion.MutableAggregateChange;
import org.springframework.data.relational.core.conversion.RelationalEntityDeleteWriter;
import org.springframework.data.relational.core.conversion.RelationalEntityInsertWriter;
import org.springframework.data.relational.core.conversion.RelationalEntityUpdateWriter;
import org.springframework.data.relational.core.conversion.RelationalEntityVersionUtils;
import org.springframework.data.relational.core.conversion.RootAggregateChange;
import org.springframework.data.relational.core.mapping.RelationalMappingContext;
import org.springframework.data.relational.core.mapping.RelationalPersistentEntity;
import org.springframework.data.relational.core.mapping.RelationalPersistentProperty;
Expand Down Expand Up @@ -273,6 +276,25 @@ public <S> void deleteById(Object id, Class<S> domainType) {
deleteTree(id, null, domainType);
}

@Override
public <T> void deleteAllById(Iterable<?> ids, Class<T> domainType) {

Assert.isTrue(ids.iterator().hasNext(), "Ids must not be empty!");

BatchingAggregateChange<T, DeleteAggregateChange<T>> batchingAggregateChange = BatchingAggregateChange
.forDelete(domainType);

ids.forEach(id -> {
DeleteAggregateChange<T> change = createDeletingChange(id, null, domainType);
triggerBeforeDelete(null, id, change);
batchingAggregateChange.add(change);
});

executor.executeDelete(batchingAggregateChange);

ids.forEach(id -> triggerAfterDelete(null, id, batchingAggregateChange));
}

@Override
public void deleteAll(Class<?> domainType) {

Expand All @@ -282,6 +304,28 @@ public void deleteAll(Class<?> domainType) {
executor.executeDelete(change);
}

@Override
public <T> void deleteAll(Iterable<? extends T> instances, Class<T> domainType) {

Assert.isTrue(instances.iterator().hasNext(), "Aggregate instances must not be empty!");

BatchingAggregateChange<T, DeleteAggregateChange<T>> batchingAggregateChange = BatchingAggregateChange
.forDelete(domainType);
Map<Object, T> instancesBeforeExecute = new LinkedHashMap<>();

instances.forEach(instance -> {
Object id = context.getRequiredPersistentEntity(domainType).getIdentifierAccessor(instance)
.getRequiredIdentifier();
DeleteAggregateChange<T> change = createDeletingChange(id, instance, domainType);
instancesBeforeExecute.put(id, triggerBeforeDelete(instance, id, change));
batchingAggregateChange.add(change);
});

executor.executeDelete(batchingAggregateChange);

instancesBeforeExecute.forEach((id, instance) -> triggerAfterDelete(instance, id, batchingAggregateChange));
}

private <T> T afterExecute(AggregateChange<T> change, T entityAfterExecution) {

Object identifier = context.getRequiredPersistentEntity(change.getEntityType())
Expand All @@ -292,8 +336,7 @@ private <T> T afterExecute(AggregateChange<T> change, T entityAfterExecution) {
return triggerAfterSave(entityAfterExecution, change);
}

private <T> RootAggregateChange<T> beforeExecute(T aggregateRoot,
Function<T, RootAggregateChange<T>> changeCreator) {
private <T> RootAggregateChange<T> beforeExecute(T aggregateRoot, Function<T, RootAggregateChange<T>> changeCreator) {

Assert.notNull(aggregateRoot, "Aggregate instance must not be null!");

Expand Down Expand Up @@ -376,8 +419,7 @@ private <T> RootAggregateChange<T> createUpdateChange(EntityAndPreviousVersion<T

RootAggregateChange<T> aggregateChange = MutableAggregateChange.forSave(entityAndVersion.entity,
entityAndVersion.version);
new RelationalEntityUpdateWriter<T>(context).write(entityAndVersion.entity,
aggregateChange);
new RelationalEntityUpdateWriter<T>(context).write(entityAndVersion.entity, aggregateChange);
return aggregateChange;
}

Expand Down Expand Up @@ -420,7 +462,7 @@ private <T> RelationalPersistentEntity<T> getRequiredPersistentEntity(T instance
return (RelationalPersistentEntity<T>) context.getRequiredPersistentEntity(instance.getClass());
}

private <T> MutableAggregateChange<T> createDeletingChange(Object id, @Nullable T entity, Class<T> domainType) {
private <T> DeleteAggregateChange<T> createDeletingChange(Object id, @Nullable T entity, Class<T> domainType) {

Number previousVersion = null;
if (entity != null) {
Expand All @@ -429,7 +471,7 @@ private <T> MutableAggregateChange<T> createDeletingChange(Object id, @Nullable
previousVersion = RelationalEntityVersionUtils.getVersionNumberFromEntity(entity, persistentEntity, converter);
}
}
MutableAggregateChange<T> aggregateChange = MutableAggregateChange.forDelete(domainType, previousVersion);
DeleteAggregateChange<T> aggregateChange = MutableAggregateChange.forDelete(domainType, previousVersion);
jdbcEntityDeleteWriter.write(id, aggregateChange);
return aggregateChange;
}
Expand Down Expand Up @@ -482,7 +524,7 @@ private <T> T triggerAfterSave(T aggregateRoot, AggregateChange<T> change) {
return entityCallbacks.callback(AfterSaveCallback.class, aggregateRoot);
}

private <T> void triggerAfterDelete(@Nullable T aggregateRoot, Object id, MutableAggregateChange<T> change) {
private <T> void triggerAfterDelete(@Nullable T aggregateRoot, Object id, AggregateChange<T> change) {

publisher.publishEvent(new AfterDeleteEvent<>(Identifier.of(id), aggregateRoot, change));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,13 @@ public void delete(T instance) {

@Override
public void deleteAllById(Iterable<? extends ID> ids) {
ids.forEach(it -> entityOperations.deleteById(it, entity.getType()));
entityOperations.deleteAllById(ids, entity.getType());
}

@Transactional
@Override
@SuppressWarnings("unchecked")
public void deleteAll(Iterable<? extends T> entities) {
entities.forEach(it -> entityOperations.delete(it, (Class<T>) it.getClass()));
entityOperations.deleteAll(entities, entity.getType());
}

@Transactional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,38 @@ void saveAndDeleteAllWithReferencedEntity() {
softly.assertAll();
}

@Test // GH-537
@EnabledOnFeature(SUPPORTS_QUOTED_IDS)
void saveAndDeleteAllByAggregateRootsWithReferencedEntity() {
LegoSet legoSet1 = template.save(legoSet);
LegoSet legoSet2 = template.save(createLegoSet("Some Name"));

template.deleteAll(List.of(legoSet1, legoSet2), LegoSet.class);

SoftAssertions softly = new SoftAssertions();

assertThat(template.findAll(LegoSet.class)).isEmpty();
assertThat(template.findAll(Manual.class)).isEmpty();

softly.assertAll();
}

@Test // GH-537
@EnabledOnFeature(SUPPORTS_QUOTED_IDS)
void saveAndDeleteAllByIdsWithReferencedEntity() {
LegoSet legoSet1 = template.save(legoSet);
LegoSet legoSet2 = template.save(createLegoSet("Some Name"));

template.deleteAllById(List.of(legoSet1.id, legoSet2.id), LegoSet.class);

SoftAssertions softly = new SoftAssertions();

assertThat(template.findAll(LegoSet.class)).isEmpty();
assertThat(template.findAll(Manual.class)).isEmpty();

softly.assertAll();
}

@Test // DATAJDBC-112
@EnabledOnFeature({ SUPPORTS_QUOTED_IDS, SUPPORTS_GENERATED_IDS_IN_REFERENCED_ENTITIES })
void updateReferencedEntityFromNull() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,19 @@ static <T> BatchingAggregateChange<T, RootAggregateChange<T>> forSave(Class<T> e

return new SaveBatchingAggregateChange<>(entityClass);
}

/**
* Factory method to create a {@link BatchingAggregateChange} for deleting entities.
*
* @param entityClass aggregate root type.
* @param <T> entity type.
* @return the {@link BatchingAggregateChange} for deleting root entities.
* @since 3.0
*/
static <T> BatchingAggregateChange<T, DeleteAggregateChange<T>> forDelete(Class<T> entityClass) {

Assert.notNull(entityClass, "Entity class must not be null");

return new DeleteBatchingAggregateChange<>(entityClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
* @author Chirag Tailor
* @since 2.0
*/
class DefaultAggregateChange<T> implements MutableAggregateChange<T> {

private final Kind kind;
public class DeleteAggregateChange<T> implements MutableAggregateChange<T> {

/** Type of the aggregate root to be changed */
private final Class<T> entityType;
Expand All @@ -42,9 +40,7 @@ class DefaultAggregateChange<T> implements MutableAggregateChange<T> {
/** The previous version assigned to the instance being changed, if available */
@Nullable private final Number previousVersion;

public DefaultAggregateChange(Kind kind, Class<T> entityType, @Nullable Number previousVersion) {

this.kind = kind;
public DeleteAggregateChange(Class<T> entityType, @Nullable Number previousVersion) {
this.entityType = entityType;
this.previousVersion = previousVersion;
}
Expand All @@ -64,7 +60,7 @@ public void addAction(DbAction<?> action) {

@Override
public Kind getKind() {
return this.kind;
return Kind.DELETE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.springframework.data.relational.core.conversion;

import org.springframework.data.mapping.PersistentPropertyPath;
import org.springframework.data.relational.core.mapping.RelationalPersistentProperty;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static java.util.Collections.*;

/**
* A {@link BatchingAggregateChange} implementation for delete changes that can contain actions for one or more delete
* operations. When consumed, actions are yielded in the appropriate entity tree order with deletes carried out from
* leaves to root. All operations that can be batched are grouped and combined to offer the ability for an optimized
* batch operation to be used.
*
* @author Chirag Tailor
* @since 3.0
*/
public class DeleteBatchingAggregateChange<T> implements BatchingAggregateChange<T, DeleteAggregateChange<T>> {

private static final Comparator<PersistentPropertyPath<RelationalPersistentProperty>> pathLengthComparator = //
Comparator.comparing(PersistentPropertyPath::getLength);

private final Class<T> entityType;
private final List<DbAction.DeleteRoot<T>> rootActions = new ArrayList<>();
private final List<DbAction.AcquireLockRoot<?>> lockActions = new ArrayList<>();
private final Map<PersistentPropertyPath<RelationalPersistentProperty>, List<DbAction.Delete<Object>>> deleteActions = //
new HashMap<>();

public DeleteBatchingAggregateChange(Class<T> entityType) {
this.entityType = entityType;
}

@Override
public Kind getKind() {
return Kind.DELETE;
}

@Override
public Class<T> getEntityType() {
return entityType;
}

@Override
public void forEachAction(Consumer<? super DbAction<?>> consumer) {

lockActions.forEach(consumer);
deleteActions.entrySet().stream().sorted(Map.Entry.comparingByKey(pathLengthComparator.reversed()))
.forEach((entry) -> {
List<DbAction.Delete<Object>> deletes = entry.getValue();
if (deletes.size() > 1) {
consumer.accept(new DbAction.BatchDelete<>(deletes));
} else {
deletes.forEach(consumer);
}
});
rootActions.forEach(consumer);
}

@Override
public void add(DeleteAggregateChange<T> aggregateChange) {

aggregateChange.forEachAction(action -> {
if (action instanceof DbAction.DeleteRoot<?> deleteRootAction) {
//noinspection unchecked
rootActions.add((DbAction.DeleteRoot<T>) deleteRootAction);
} else if (action instanceof DbAction.Delete<?> deleteAction) {
// noinspection unchecked
addDelete((DbAction.Delete<Object>) deleteAction);
} else if (action instanceof DbAction.AcquireLockRoot<?> lockRootAction) {
lockActions.add(lockRootAction);
}
});
}

private void addDelete(DbAction.Delete<Object> action) {

PersistentPropertyPath<RelationalPersistentProperty> propertyPath = action.getPropertyPath();
deleteActions.merge(propertyPath, new ArrayList<>(singletonList(action)), (actions, defaultValue) -> {
actions.add(action);
return actions;
});
}
}
Loading