15
15
*/
16
16
package org .springframework .data .neo4j .core ;
17
17
18
- import reactor .core .publisher .Flux ;
19
- import reactor .core .publisher .Mono ;
20
- import reactor .util .function .Tuple2 ;
21
-
22
- import java .util .Map ;
23
- import java .util .function .BiFunction ;
24
- import java .util .function .Function ;
25
- import java .util .function .Supplier ;
26
-
27
18
import org .neo4j .driver .Driver ;
28
19
import org .neo4j .driver .Record ;
29
20
import org .neo4j .driver .reactive .RxQueryRunner ;
41
32
import org .springframework .data .neo4j .core .transaction .ReactiveNeo4jTransactionManager ;
42
33
import org .springframework .lang .Nullable ;
43
34
import org .springframework .util .Assert ;
35
+ import reactor .core .publisher .Flux ;
36
+ import reactor .core .publisher .Mono ;
37
+ import reactor .util .function .Tuple2 ;
38
+
39
+ import java .util .Map ;
40
+ import java .util .function .BiFunction ;
41
+ import java .util .function .Function ;
42
+ import java .util .function .Supplier ;
44
43
45
44
/**
46
45
* Reactive variant of the {@link Neo4jClient}.
@@ -54,13 +53,15 @@ class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
54
53
55
54
private final Driver driver ;
56
55
private final TypeSystem typeSystem ;
56
+ private final ReactiveDatabaseSelectionProvider databaseSelectionProvider ;
57
57
private final ConversionService conversionService ;
58
58
private final Neo4jPersistenceExceptionTranslator persistenceExceptionTranslator = new Neo4jPersistenceExceptionTranslator ();
59
59
60
- DefaultReactiveNeo4jClient (Driver driver ) {
60
+ DefaultReactiveNeo4jClient (Driver driver , @ Nullable ReactiveDatabaseSelectionProvider databaseSelectionProvider ) {
61
61
62
62
this .driver = driver ;
63
63
this .typeSystem = driver .defaultTypeSystem ();
64
+ this .databaseSelectionProvider = databaseSelectionProvider ;
64
65
this .conversionService = new DefaultConversionService ();
65
66
new Neo4jConversions ().registerConvertersIn ((ConverterRegistry ) conversionService );
66
67
}
@@ -180,7 +181,7 @@ public Mono<ResultSummary> run() {
180
181
181
182
class DefaultRecordFetchSpec <T > implements RecordFetchSpec <T >, MappingSpec <T > {
182
183
183
- private final String targetDatabase ;
184
+ private final Mono < DatabaseSelection > targetDatabase ;
184
185
185
186
private final Supplier <String > cypherSupplier ;
186
187
@@ -192,9 +193,18 @@ class DefaultRecordFetchSpec<T> implements RecordFetchSpec<T>, MappingSpec<T> {
192
193
this (targetDatabase , cypherSupplier , parameters , null );
193
194
}
194
195
195
- DefaultRecordFetchSpec (String targetDatabase , Supplier <String > cypherSupplier , NamedParameters parameters ,
196
+ DefaultRecordFetchSpec (@ Nullable String targetDatabase , Supplier <String > cypherSupplier , NamedParameters parameters ,
196
197
@ Nullable BiFunction <TypeSystem , Record , T > mappingFunction ) {
197
- this .targetDatabase = targetDatabase ;
198
+
199
+ this .targetDatabase = Mono .defer (() -> {
200
+ if (targetDatabase != null ) {
201
+ return ReactiveDatabaseSelectionProvider .createStaticDatabaseSelectionProvider (targetDatabase )
202
+ .getDatabaseSelection ();
203
+ } else if (databaseSelectionProvider != null ) {
204
+ return databaseSelectionProvider .getDatabaseSelection ();
205
+ }
206
+ return Mono .just (DatabaseSelection .undecided ());
207
+ });
198
208
this .cypherSupplier = cypherSupplier ;
199
209
this .parameters = parameters ;
200
210
this .mappingFunction = mappingFunction ;
@@ -229,33 +239,36 @@ Flux<T> executeWith(Tuple2<String, Map<String, Object>> t, RxQueryRunner runner)
229
239
@ Override
230
240
public Mono <T > one () {
231
241
232
- return doInQueryRunnerForMono (targetDatabase ,
233
- (runner ) -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).singleOrEmpty ())
242
+ return targetDatabase . flatMap ( databaseSelection -> doInQueryRunnerForMono (databaseSelection . getValue () ,
243
+ (runner ) -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).singleOrEmpty ()))
234
244
.onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
235
245
}
236
246
237
247
@ Override
238
248
public Mono <T > first () {
239
249
240
- return doInQueryRunnerForMono (targetDatabase ,
241
- runner -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).next ())
250
+ return targetDatabase . flatMap ( databaseSelection -> doInQueryRunnerForMono (databaseSelection . getValue () ,
251
+ runner -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).next ()))
242
252
.onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
243
253
}
244
254
245
255
@ Override
246
256
public Flux <T > all () {
247
257
248
- return doInStatementRunnerForFlux (targetDatabase ,
249
- runner -> prepareStatement ().flatMapMany (t -> executeWith (t , runner ))).onErrorMap (RuntimeException .class ,
250
- DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
258
+ return targetDatabase .flatMapMany (databaseSelection ->
259
+ doInStatementRunnerForFlux (databaseSelection .getValue (),
260
+ runner -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )))
261
+ )
262
+ .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
251
263
}
252
264
253
265
Mono <ResultSummary > run () {
254
266
255
- return doInQueryRunnerForMono (targetDatabase , runner -> prepareStatement ().flatMap (t -> {
256
- RxResult rxResult = runner .run (t .getT1 (), t .getT2 ());
257
- return Flux .from (rxResult .records ()).then (Mono .from (rxResult .consume ()).map (ResultSummaries ::process ));
258
- })).onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
267
+ return targetDatabase .flatMap (databaseSelection ->
268
+ doInQueryRunnerForMono (databaseSelection .getValue (), runner -> prepareStatement ().flatMap (t -> {
269
+ RxResult rxResult = runner .run (t .getT1 (), t .getT2 ());
270
+ return Flux .from (rxResult .records ()).then (Mono .from (rxResult .consume ()).map (ResultSummaries ::process ));
271
+ }))).onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
259
272
}
260
273
}
261
274
0 commit comments