diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java index 6a7a5efab..2818a1014 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors + * Copyright 2020-2022 the original author or authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.reactivestreams.Publisher; import org.springframework.data.couchbase.core.CouchbaseOperations; import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation.ExecutableFindByQuery; import org.springframework.data.couchbase.core.query.Query; @@ -108,15 +109,16 @@ EntityInstantiators getInstantiators() { * @see org.springframework.data.repository.query.RepositoryQuery#execute(java.lang.Object[]) */ public Object execute(Object[] parameters) { - ReactiveCouchbaseParameterAccessor accessor = new ReactiveCouchbaseParameterAccessor(getQueryMethod(), parameters); - - Object result = accessor.resolveParameters().map(this::executeDeferred); - return ((Mono) result).block() ; + return method.hasReactiveWrapperParameter() ? accessor.resolveParameters().flatMapMany(this::executeDeferred) + : execute(accessor); } - private Object executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor) { - return execute(parameterAccessor); + private Publisher executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor) { + if (getQueryMethod().isCollectionQuery()) { + return Flux.defer(() -> (Publisher) execute(parameterAccessor)); + } + return Mono.defer(() -> (Mono) execute(parameterAccessor)); } private Object execute(ParametersParameterAccessor parameterAccessor) {