Skip to content

Commit e9f9d66

Browse files
authored
Fix reactive parameter handling. (#1519)
* Fix Reactive Parameter Handling. Closes #1516.
1 parent fea7ed8 commit e9f9d66

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/repository/query/AbstractCouchbaseQueryBase.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors
2+
* Copyright 2020-2022 the original author or authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
2020

21+
import org.reactivestreams.Publisher;
2122
import org.springframework.data.couchbase.core.CouchbaseOperations;
2223
import org.springframework.data.couchbase.core.ExecutableFindByQueryOperation.ExecutableFindByQuery;
2324
import org.springframework.data.couchbase.core.query.Query;
@@ -108,15 +109,16 @@ EntityInstantiators getInstantiators() {
108109
* @see org.springframework.data.repository.query.RepositoryQuery#execute(java.lang.Object[])
109110
*/
110111
public Object execute(Object[] parameters) {
111-
112112
ReactiveCouchbaseParameterAccessor accessor = new ReactiveCouchbaseParameterAccessor(getQueryMethod(), parameters);
113-
114-
Object result = accessor.resolveParameters().map(this::executeDeferred);
115-
return ((Mono<Object>) result).block() ;
113+
return method.hasReactiveWrapperParameter() ? accessor.resolveParameters().flatMapMany(this::executeDeferred)
114+
: execute(accessor);
116115
}
117116

118-
private Object executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor) {
119-
return execute(parameterAccessor);
117+
private Publisher<Object> executeDeferred(ReactiveCouchbaseParameterAccessor parameterAccessor) {
118+
if (getQueryMethod().isCollectionQuery()) {
119+
return Flux.defer(() -> (Publisher<Object>) execute(parameterAccessor));
120+
}
121+
return Mono.defer(() -> (Mono<Object>) execute(parameterAccessor));
120122
}
121123

122124
private Object execute(ParametersParameterAccessor parameterAccessor) {

0 commit comments

Comments
 (0)