Skip to content

Commit de53d77

Browse files
committed
DatabaseClient uses SQL Supplier more lazily
This commit modifies the `DefaultDatabaseClient` implementation in order to ensure lazier usage of the `Supplier<String>` passed to the sql method (`DatabaseClient#sql(Supplier)`). Since technically `DatabaseClient` is an interface that could have 3rd party implementations, the lazyness expectation is only hinted at in the `DatabaseClient#sql` javadoc. Possible caveat: some log statements attempt to reflect the now lazily resolved SQL string. Similarly, some exceptions can capture the SQL that caused the issue if known. We expect that these always occur after the execution of the statement has been attempted (see `ResultFunction`). At this point the SQL string will be accessible and logs and exceptions should reflect it as before. Keep an eye out for such strings turning into `null` after this change, which would indicate the opposite. Backport of d72df5a See gh-29367 Closes gh-29887
1 parent e4e90bb commit de53d77

File tree

7 files changed

+207
-49
lines changed

7 files changed

+207
-49
lines changed
Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,34 +20,18 @@
2020

2121
import io.r2dbc.spi.Connection;
2222

23+
2324
/**
2425
* Union type combining {@link Function} and {@link SqlProvider} to expose the SQL that is
25-
* related to the underlying action.
26+
* related to the underlying action. The SqlProvider can support lazy / generate once semantics,
27+
* in which case {@link #getSql()} can be {@code null} until the {@code #apply(Connection)}
28+
* method is invoked.
2629
*
2730
* @author Mark Paluch
31+
* @author Simon Baslé
2832
* @since 5.3
2933
* @param <R> the type of the result of the function.
3034
*/
31-
class ConnectionFunction<R> implements Function<Connection, R>, SqlProvider {
32-
33-
private final String sql;
34-
35-
private final Function<Connection, R> function;
36-
37-
38-
ConnectionFunction(String sql, Function<Connection, R> function) {
39-
this.sql = sql;
40-
this.function = function;
41-
}
42-
43-
44-
@Override
45-
public R apply(Connection t) {
46-
return this.function.apply(t);
47-
}
48-
49-
@Override
50-
public String getSql() {
51-
return this.sql;
52-
}
35+
interface ConnectionFunction<R> extends Function<Connection, R>, SqlProvider {
5336
}
37+

spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DatabaseClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -79,7 +79,10 @@ public interface DatabaseClient extends ConnectionAccessor {
7979
* the execution. The SQL string can contain either native parameter
8080
* bind markers or named parameters (e.g. {@literal :foo, :bar}) when
8181
* {@link NamedParameterExpander} is enabled.
82-
* <p>Accepts {@link PreparedOperation} as SQL and binding {@link Supplier}
82+
* <p>Accepts {@link PreparedOperation} as SQL and binding {@link Supplier}.
83+
* <p>{code DatabaseClient} implementations should defer the resolution of
84+
* the SQL string as much as possible, ideally up to the point where a
85+
* {@code Subscription} happens. This is the case for the default implementation.
8386
* @param sqlSupplier a supplier for the SQL statement
8487
* @return a new {@link GenericExecuteSpec}
8588
* @see NamedParameterExpander

spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultDatabaseClient.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -60,6 +60,7 @@
6060
* @author Mark Paluch
6161
* @author Mingyuan Wu
6262
* @author Bogdan Ilchyshyn
63+
* @author Simon Baslé
6364
* @since 5.3
6465
*/
6566
class DefaultDatabaseClient implements DatabaseClient {
@@ -322,9 +323,8 @@ public Mono<Void> then() {
322323
return fetch().rowsUpdated().then();
323324
}
324325

325-
private <T> FetchSpec<T> execute(Supplier<String> sqlSupplier, BiFunction<Row, RowMetadata, T> mappingFunction) {
326-
String sql = getRequiredSql(sqlSupplier);
327-
Function<Connection, Statement> statementFunction = connection -> {
326+
private ResultFunction getResultFunction(Supplier<String> sqlSupplier) {
327+
BiFunction<Connection, String, Statement> statementFunction = (connection, sql) -> {
328328
if (logger.isDebugEnabled()) {
329329
logger.debug("Executing SQL statement [" + sql + "]");
330330
}
@@ -370,16 +370,16 @@ private <T> FetchSpec<T> execute(Supplier<String> sqlSupplier, BiFunction<Row, R
370370
return statement;
371371
};
372372

373-
Function<Connection, Flux<Result>> resultFunction = connection -> {
374-
Statement statement = statementFunction.apply(connection);
375-
return Flux.from(this.filterFunction.filter(statement, DefaultDatabaseClient.this.executeFunction))
376-
.cast(Result.class).checkpoint("SQL \"" + sql + "\" [DatabaseClient]");
377-
};
373+
return new ResultFunction(sqlSupplier, statementFunction, this.filterFunction, DefaultDatabaseClient.this.executeFunction);
374+
}
375+
376+
private <T> FetchSpec<T> execute(Supplier<String> sqlSupplier, BiFunction<Row, RowMetadata, T> mappingFunction) {
377+
ResultFunction resultHandler = getResultFunction(sqlSupplier);
378378

379379
return new DefaultFetchSpec<>(
380-
DefaultDatabaseClient.this, sql,
381-
new ConnectionFunction<>(sql, resultFunction),
382-
new ConnectionFunction<>(sql, connection -> sumRowsUpdated(resultFunction, connection)),
380+
DefaultDatabaseClient.this,
381+
resultHandler,
382+
connection -> sumRowsUpdated(resultHandler, connection),
383383
mappingFunction);
384384
}
385385

spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultFetchSpec.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
2020
import java.util.function.Function;
2121

2222
import io.r2dbc.spi.Connection;
23-
import io.r2dbc.spi.Result;
2423
import io.r2dbc.spi.Row;
2524
import io.r2dbc.spi.RowMetadata;
2625
import reactor.core.publisher.Flux;
@@ -32,31 +31,29 @@
3231
* Default {@link FetchSpec} implementation.
3332
*
3433
* @author Mark Paluch
34+
* @author Simon Baslé
3535
* @since 5.3
3636
* @param <T> the row result type
3737
*/
3838
class DefaultFetchSpec<T> implements FetchSpec<T> {
3939

4040
private final ConnectionAccessor connectionAccessor;
4141

42-
private final String sql;
43-
44-
private final Function<Connection, Flux<Result>> resultFunction;
42+
private final ResultFunction resultFunction;
4543

4644
private final Function<Connection, Mono<Integer>> updatedRowsFunction;
4745

4846
private final BiFunction<Row, RowMetadata, T> mappingFunction;
4947

5048

51-
DefaultFetchSpec(ConnectionAccessor connectionAccessor, String sql,
52-
Function<Connection, Flux<Result>> resultFunction,
49+
DefaultFetchSpec(ConnectionAccessor connectionAccessor,
50+
ResultFunction resultFunction,
5351
Function<Connection, Mono<Integer>> updatedRowsFunction,
5452
BiFunction<Row, RowMetadata, T> mappingFunction) {
5553

56-
this.sql = sql;
5754
this.connectionAccessor = connectionAccessor;
5855
this.resultFunction = resultFunction;
59-
this.updatedRowsFunction = updatedRowsFunction;
56+
this.updatedRowsFunction = new DelegateConnectionFunction<>(resultFunction, updatedRowsFunction);
6057
this.mappingFunction = mappingFunction;
6158
}
6259

@@ -70,7 +67,7 @@ public Mono<T> one() {
7067
}
7168
if (list.size() > 1) {
7269
return Mono.error(new IncorrectResultSizeDataAccessException(
73-
String.format("Query [%s] returned non unique result.", this.sql),
70+
String.format("Query [%s] returned non unique result.", this.resultFunction.getSql()),
7471
1));
7572
}
7673
return Mono.just(list.get(0));
@@ -84,7 +81,7 @@ public Mono<T> first() {
8481

8582
@Override
8683
public Flux<T> all() {
87-
return this.connectionAccessor.inConnectionMany(new ConnectionFunction<>(this.sql,
84+
return this.connectionAccessor.inConnectionMany(new DelegateConnectionFunction<>(this.resultFunction,
8885
connection -> this.resultFunction.apply(connection)
8986
.flatMap(result -> result.map(this.mappingFunction))));
9087
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.r2dbc.core;
18+
19+
import java.util.function.Function;
20+
21+
import io.r2dbc.spi.Connection;
22+
23+
import org.springframework.lang.Nullable;
24+
25+
/**
26+
* A {@link ConnectionFunction} that delegates to a {@code SqlProvider} and a plain
27+
* {@code Function}.
28+
*
29+
* @author Simon Baslé
30+
* @since 5.3.26
31+
* @param <R> the type of the result of the function.
32+
*/
33+
final class DelegateConnectionFunction<R> implements ConnectionFunction<R> {
34+
35+
private final SqlProvider sql;
36+
37+
private final Function<Connection, R> function;
38+
39+
40+
DelegateConnectionFunction(SqlProvider sql, Function<Connection, R> function) {
41+
this.sql = sql;
42+
this.function = function;
43+
}
44+
45+
46+
@Override
47+
public R apply(Connection t) {
48+
return this.function.apply(t);
49+
}
50+
51+
@Nullable
52+
@Override
53+
public String getSql() {
54+
return this.sql.getSql();
55+
}
56+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.r2dbc.core;
18+
19+
import java.util.function.BiFunction;
20+
import java.util.function.Supplier;
21+
22+
import io.r2dbc.spi.Connection;
23+
import io.r2dbc.spi.Result;
24+
import io.r2dbc.spi.Statement;
25+
import reactor.core.publisher.Flux;
26+
27+
import org.springframework.lang.Nullable;
28+
import org.springframework.util.Assert;
29+
import org.springframework.util.StringUtils;
30+
31+
/**
32+
* A {@link ConnectionFunction} that produces a {@code Flux} of {@link Result} and that
33+
* defers generation of the SQL until the function has been applied.
34+
* Beforehand, the {@code getSql()} method simply returns {@code null}. The sql String is
35+
* also memoized during application, so that subsequent calls to {@link #getSql()} return
36+
* the same {@code String} without further calls to the {@code Supplier}.
37+
*
38+
* @author Mark Paluch
39+
* @author Simon Baslé
40+
* @since 5.3.26
41+
*/
42+
final class ResultFunction implements ConnectionFunction<Flux<Result>> {
43+
44+
final Supplier<String> sqlSupplier;
45+
final BiFunction<Connection, String, Statement> statementFunction;
46+
final StatementFilterFunction filterFunction;
47+
final ExecuteFunction executeFunction;
48+
49+
@Nullable
50+
String resolvedSql = null;
51+
52+
ResultFunction(Supplier<String> sqlSupplier, BiFunction<Connection, String, Statement> statementFunction, StatementFilterFunction filterFunction, ExecuteFunction executeFunction) {
53+
this.sqlSupplier = sqlSupplier;
54+
this.statementFunction = statementFunction;
55+
this.filterFunction = filterFunction;
56+
this.executeFunction = executeFunction;
57+
}
58+
59+
@Override
60+
public Flux<Result> apply(Connection connection) {
61+
String sql = this.sqlSupplier.get();
62+
Assert.state(StringUtils.hasText(sql), "SQL returned by supplier must not be empty");
63+
this.resolvedSql = sql;
64+
Statement statement = this.statementFunction.apply(connection, sql);
65+
return Flux.from(this.filterFunction.filter(statement, this.executeFunction))
66+
.cast(Result.class).checkpoint("SQL \"" + sql + "\" [DatabaseClient]");
67+
}
68+
69+
@Nullable
70+
@Override
71+
public String getSql() {
72+
return this.resolvedSql;
73+
}
74+
}

spring-r2dbc/src/test/java/org/springframework/r2dbc/core/DefaultDatabaseClientUnitTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.r2dbc.core;
1818

1919
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.concurrent.atomic.AtomicInteger;
2022

2123
import io.r2dbc.spi.Connection;
2224
import io.r2dbc.spi.ConnectionFactory;
@@ -64,6 +66,7 @@
6466
* @author Mark Paluch
6567
* @author Ferdinand Jacobs
6668
* @author Jens Schauder
69+
* @author Simon Baslé
6770
*/
6871
@ExtendWith(MockitoExtension.class)
6972
@MockitoSettings(strictness = Strictness.LENIENT)
@@ -397,6 +400,47 @@ void shouldApplySimpleStatementFilterFunctions() {
397400
inOrder.verifyNoMoreInteractions();
398401
}
399402

403+
@Test
404+
void sqlSupplierInvocationIsDeferredUntilSubscription() {
405+
// We'll have either 2 or 3 rows, depending on the subscription and the generated SQL
406+
MockRowMetadata metadata = MockRowMetadata.builder().columnMetadata(
407+
MockColumnMetadata.builder().name("id").javaType(Integer.class).build()).build();
408+
final MockRow row1 = MockRow.builder().identified("id", Integer.class, 1).build();
409+
final MockRow row2 = MockRow.builder().identified("id", Integer.class, 2).build();
410+
final MockRow row3 = MockRow.builder().identified("id", Integer.class, 3).build();
411+
// Set up 2 mock statements
412+
mockStatementFor("SELECT id FROM test WHERE id < '3'", MockResult.builder()
413+
.rowMetadata(metadata)
414+
.row(row1, row2).build());
415+
mockStatementFor("SELECT id FROM test WHERE id < '4'", MockResult.builder()
416+
.rowMetadata(metadata)
417+
.row(row1, row2, row3).build());
418+
// Create the client
419+
DatabaseClient databaseClient = this.databaseClientBuilder.build();
420+
421+
AtomicInteger invoked = new AtomicInteger();
422+
// Assemble a publisher, but don't subscribe yet
423+
Mono<List<Integer>> operation = databaseClient
424+
.sql(() -> {
425+
int idMax = 2 + invoked.incrementAndGet();
426+
return String.format("SELECT id FROM test WHERE id < '%s'", idMax);
427+
})
428+
.map(r -> r.get("id", Integer.class))
429+
.all()
430+
.collectList();
431+
432+
assertThat(invoked).as("invoked (before subscription)").hasValue(0);
433+
434+
List<Integer> rows = operation.block();
435+
assertThat(invoked).as("invoked (after 1st subscription)").hasValue(1);
436+
assertThat(rows).containsExactly(1, 2);
437+
438+
rows = operation.block();
439+
assertThat(invoked).as("invoked (after 2nd subscription)").hasValue(2);
440+
assertThat(rows).containsExactly(1, 2, 3);
441+
}
442+
443+
400444
private Statement mockStatement() {
401445
return mockStatementFor(null, null);
402446
}

0 commit comments

Comments
 (0)