From 250a467c5bb5559c4a079c04ba6e7c005bc3409f Mon Sep 17 00:00:00 2001 From: Michael Reiche <48999328+mikereiche@users.noreply.github.com> Date: Fri, 29 Jul 2022 14:16:56 -0700 Subject: [PATCH 1/2] Fix Reactive Parameter Handling. Closes #1516. --- .../query/AbstractCouchbaseQueryBase.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java index 6a7a5efab..e34d8ca6e 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.reactivestreams.Publisher; import org.springframework.data.couchbase.core.CouchbaseOperations; import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation.ExecutableFindByQuery; import org.springframework.data.couchbase.core.query.Query; @@ -108,15 +109,17 @@ EntityInstantiators getInstantiators() { * @see org.springframework.data.repository.query.RepositoryQuery#execute(java.lang.Object[]) */ public Object execute(Object[] parameters) { - ReactiveCouchbaseParameterAccessor accessor = new ReactiveCouchbaseParameterAccessor(getQueryMethod(), parameters); - - Object result = accessor.resolveParameters().map(this::executeDeferred); - return ((Mono) result).block() ; + return method.hasReactiveWrapperParameter() ? accessor.resolveParameters().flatMapMany(this::executeDeferred) + : execute(accessor); } - private Object executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor) { - return execute(parameterAccessor); + private Publisher executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor/*Object[] parameters*/) { + //ReactiveCouchbaseParameterAccessor parameterAccessor = new ReactiveCouchbaseParameterAccessor(method, parameters); + if (getQueryMethod().isCollectionQuery()) { + return Flux.defer(() -> (Publisher) execute(parameterAccessor)); + } + return Mono.defer(() -> (Mono) execute(parameterAccessor)); } private Object execute(ParametersParameterAccessor parameterAccessor) { From afd8fd6690d0f34050f26f0abf0f493a3771c66b Mon Sep 17 00:00:00 2001 From: Michael Reiche <48999328+mikereiche@users.noreply.github.com> Date: Fri, 29 Jul 2022 14:21:58 -0700 Subject: [PATCH 2/2] Fix reactive parameter handling. Closes #1516. --- .../repository/query/AbstractCouchbaseQueryBase.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java index e34d8ca6e..2818a1014 100644 --- a/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java +++ b/spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors + * Copyright 2020-2022 the original author or authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -114,8 +114,7 @@ public Object execute(Object[] parameters) { : execute(accessor); } - private Publisher executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor/*Object[] parameters*/) { - //ReactiveCouchbaseParameterAccessor parameterAccessor = new ReactiveCouchbaseParameterAccessor(method, parameters); + private Publisher executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor) { if (getQueryMethod().isCollectionQuery()) { return Flux.defer(() -> (Publisher) execute(parameterAccessor)); }