Skip to content

Commit d72df5a

Browse files
committed
DatabaseClient uses SQL Supplier more lazily
This commit modifies the `DefaultDatabaseClient` implementation in order to ensure lazier usage of the `Supplier<String>` passed to the sql method (`DatabaseClient#sql(Supplier)`). Since technically `DatabaseClient` is an interface that could have 3rd party implementations, the lazyness expectation is only hinted at in the `DatabaseClient#sql` javadoc. Possible caveat: some log statements attempt to reflect the now lazily resolved SQL string. Similarly, some exceptions can capture the SQL that caused the issue if known. We expect that these always occur after the execution of the statement has been attempted (see `ResultFunction`). At this point the SQL string will be accessible and logs and exceptions should reflect it as before. Keep an eye out for such strings turning into `null` after this change, which would indicate the opposite. Closes gh-29367
1 parent 767f59a commit d72df5a

File tree

7 files changed

+204
-52
lines changed

7 files changed

+204
-52
lines changed
Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,34 +20,19 @@
2020

2121
import io.r2dbc.spi.Connection;
2222

23+
2324
/**
2425
* Union type combining {@link Function} and {@link SqlProvider} to expose the SQL that is
25-
* related to the underlying action.
26+
* related to the underlying action. The SqlProvider can support lazy / generate once semantics,
27+
* in which case {@link #getSql()} can be {@code null} until the {@code #apply(Connection)}
28+
* method is invoked.
2629
*
2730
* @author Mark Paluch
31+
* @author Simon Baslé
2832
* @since 5.3
2933
* @param <R> the type of the result of the function.
3034
*/
31-
class ConnectionFunction<R> implements Function<Connection, R>, SqlProvider {
32-
33-
private final String sql;
34-
35-
private final Function<Connection, R> function;
36-
37-
38-
ConnectionFunction(String sql, Function<Connection, R> function) {
39-
this.sql = sql;
40-
this.function = function;
41-
}
42-
43-
44-
@Override
45-
public R apply(Connection t) {
46-
return this.function.apply(t);
47-
}
48-
49-
@Override
50-
public String getSql() {
51-
return this.sql;
52-
}
35+
sealed interface ConnectionFunction<R> extends Function<Connection, R>, SqlProvider
36+
permits DelegateConnectionFunction, ResultFunction {
5337
}
38+

spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DatabaseClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,7 +83,10 @@ public interface DatabaseClient extends ConnectionAccessor {
8383
* the execution. The SQL string can contain either native parameter
8484
* bind markers or named parameters (e.g. {@literal :foo, :bar}) when
8585
* {@link NamedParameterExpander} is enabled.
86-
* <p>Accepts {@link PreparedOperation} as SQL and binding {@link Supplier}
86+
* <p>Accepts {@link PreparedOperation} as SQL and binding {@link Supplier}.
87+
* <p>{code DatabaseClient} implementations should defer the resolution of
88+
* the SQL string as much as possible, ideally up to the point where a
89+
* {@code Subscription} happens. This is the case for the default implementation.
8790
* @param sqlSupplier a supplier for the SQL statement
8891
* @return a new {@link GenericExecuteSpec}
8992
* @see NamedParameterExpander

spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultDatabaseClient.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,6 +63,7 @@
6363
* @author Mark Paluch
6464
* @author Mingyuan Wu
6565
* @author Bogdan Ilchyshyn
66+
* @author Simon Baslé
6667
* @since 5.3
6768
*/
6869
class DefaultDatabaseClient implements DatabaseClient {
@@ -346,8 +347,7 @@ public Mono<Void> then() {
346347
}
347348

348349
private ResultFunction getResultFunction(Supplier<String> sqlSupplier) {
349-
String sql = getRequiredSql(sqlSupplier);
350-
Function<Connection, Statement> statementFunction = connection -> {
350+
BiFunction<Connection, String, Statement> statementFunction = (connection, sql) -> {
351351
if (logger.isDebugEnabled()) {
352352
logger.debug("Executing SQL statement [" + sql + "]");
353353
}
@@ -393,28 +393,22 @@ private ResultFunction getResultFunction(Supplier<String> sqlSupplier) {
393393
return statement;
394394
};
395395

396-
Function<Connection, Flux<Result>> resultFunction = connection -> {
397-
Statement statement = statementFunction.apply(connection);
398-
return Flux.from(this.filterFunction.filter(statement, DefaultDatabaseClient.this.executeFunction))
399-
.cast(Result.class).checkpoint("SQL \"" + sql + "\" [DatabaseClient]");
400-
};
401-
402-
return new ResultFunction(resultFunction, sql);
396+
return new ResultFunction(sqlSupplier, statementFunction, this.filterFunction, DefaultDatabaseClient.this.executeFunction);
403397
}
404398

405399
private <T> FetchSpec<T> execute(Supplier<String> sqlSupplier, Function<Result, Publisher<T>> resultAdapter) {
406400
ResultFunction resultHandler = getResultFunction(sqlSupplier);
407401

408402
return new DefaultFetchSpec<>(
409-
DefaultDatabaseClient.this, resultHandler.sql(),
410-
new ConnectionFunction<>(resultHandler.sql(), resultHandler.function()),
411-
new ConnectionFunction<>(resultHandler.sql(), connection -> sumRowsUpdated(resultHandler.function(), connection)),
403+
DefaultDatabaseClient.this,
404+
resultHandler,
405+
connection -> sumRowsUpdated(resultHandler, connection),
412406
resultAdapter);
413407
}
414408

415409
private <T> Flux<T> flatMap(Supplier<String> sqlSupplier, Function<Result, Publisher<T>> mappingFunction) {
416410
ResultFunction resultHandler = getResultFunction(sqlSupplier);
417-
ConnectionFunction<Flux<T>> connectionFunction = new ConnectionFunction<>(resultHandler.sql(), cx -> resultHandler.function()
411+
ConnectionFunction<Flux<T>> connectionFunction = new DelegateConnectionFunction<>(resultHandler, cx -> resultHandler
418412
.apply(cx)
419413
.flatMap(mappingFunction));
420414
return inConnectionMany(connectionFunction);
@@ -473,8 +467,6 @@ private String getRequiredSql(Supplier<String> sqlSupplier) {
473467
Assert.state(StringUtils.hasText(sql), "SQL returned by supplier must not be empty");
474468
return sql;
475469
}
476-
477-
record ResultFunction(Function<Connection, Flux<Result>> function, String sql){}
478470
}
479471

480472

spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultFetchSpec.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,31 +30,29 @@
3030
* Default {@link FetchSpec} implementation.
3131
*
3232
* @author Mark Paluch
33+
* @author Simon Baslé
3334
* @since 5.3
3435
* @param <T> the row result type
3536
*/
3637
class DefaultFetchSpec<T> implements FetchSpec<T> {
3738

3839
private final ConnectionAccessor connectionAccessor;
3940

40-
private final String sql;
41-
42-
private final Function<Connection, Flux<Result>> resultFunction;
41+
private final ResultFunction resultFunction;
4342

4443
private final Function<Connection, Mono<Long>> updatedRowsFunction;
4544

4645
private final Function<Result, Publisher<T>> resultAdapter;
4746

4847

49-
DefaultFetchSpec(ConnectionAccessor connectionAccessor, String sql,
50-
Function<Connection, Flux<Result>> resultFunction,
48+
DefaultFetchSpec(ConnectionAccessor connectionAccessor,
49+
ResultFunction resultFunction,
5150
Function<Connection, Mono<Long>> updatedRowsFunction,
5251
Function<Result, Publisher<T>> resultAdapter) {
5352

54-
this.sql = sql;
5553
this.connectionAccessor = connectionAccessor;
5654
this.resultFunction = resultFunction;
57-
this.updatedRowsFunction = updatedRowsFunction;
55+
this.updatedRowsFunction = new DelegateConnectionFunction<>(resultFunction, updatedRowsFunction);
5856
this.resultAdapter = resultAdapter;
5957
}
6058

@@ -68,7 +66,7 @@ public Mono<T> one() {
6866
}
6967
if (list.size() > 1) {
7068
return Mono.error(new IncorrectResultSizeDataAccessException(
71-
String.format("Query [%s] returned non unique result.", this.sql),
69+
String.format("Query [%s] returned non unique result.", this.resultFunction.getSql()),
7270
1));
7371
}
7472
return Mono.just(list.get(0));
@@ -82,7 +80,7 @@ public Mono<T> first() {
8280

8381
@Override
8482
public Flux<T> all() {
85-
return this.connectionAccessor.inConnectionMany(new ConnectionFunction<>(this.sql,
83+
return this.connectionAccessor.inConnectionMany(new DelegateConnectionFunction<>(this.resultFunction,
8684
connection -> this.resultFunction.apply(connection)
8785
.flatMap(this.resultAdapter)));
8886
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.r2dbc.core;
18+
19+
import java.util.function.Function;
20+
21+
import io.r2dbc.spi.Connection;
22+
23+
import org.springframework.lang.Nullable;
24+
25+
/**
26+
* A {@link ConnectionFunction} that delegates to a {@code SqlProvider} and a plain
27+
* {@code Function}.
28+
*
29+
* @author Simon Baslé
30+
* @since 5.3.26
31+
* @param <R> the type of the result of the function.
32+
*/
33+
final class DelegateConnectionFunction<R> implements ConnectionFunction<R> {
34+
35+
private final SqlProvider sql;
36+
37+
private final Function<Connection, R> function;
38+
39+
40+
DelegateConnectionFunction(SqlProvider sql, Function<Connection, R> function) {
41+
this.sql = sql;
42+
this.function = function;
43+
}
44+
45+
46+
@Override
47+
public R apply(Connection t) {
48+
return this.function.apply(t);
49+
}
50+
51+
@Nullable
52+
@Override
53+
public String getSql() {
54+
return this.sql.getSql();
55+
}
56+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.r2dbc.core;
18+
19+
import java.util.function.BiFunction;
20+
import java.util.function.Supplier;
21+
22+
import io.r2dbc.spi.Connection;
23+
import io.r2dbc.spi.Result;
24+
import io.r2dbc.spi.Statement;
25+
import reactor.core.publisher.Flux;
26+
27+
import org.springframework.lang.Nullable;
28+
import org.springframework.util.Assert;
29+
import org.springframework.util.StringUtils;
30+
31+
/**
32+
* A {@link ConnectionFunction} that produces a {@code Flux} of {@link Result} and that
33+
* defers generation of the SQL until the function has been applied.
34+
* Beforehand, the {@code getSql()} method simply returns {@code null}. The sql String is
35+
* also memoized during application, so that subsequent calls to {@link #getSql()} return
36+
* the same {@code String} without further calls to the {@code Supplier}.
37+
*
38+
* @author Mark Paluch
39+
* @author Simon Baslé
40+
* @since 5.3.26
41+
*/
42+
final class ResultFunction implements ConnectionFunction<Flux<Result>> {
43+
44+
final Supplier<String> sqlSupplier;
45+
final BiFunction<Connection, String, Statement> statementFunction;
46+
final StatementFilterFunction filterFunction;
47+
final ExecuteFunction executeFunction;
48+
49+
@Nullable
50+
String resolvedSql = null;
51+
52+
ResultFunction(Supplier<String> sqlSupplier, BiFunction<Connection, String, Statement> statementFunction, StatementFilterFunction filterFunction, ExecuteFunction executeFunction) {
53+
this.sqlSupplier = sqlSupplier;
54+
this.statementFunction = statementFunction;
55+
this.filterFunction = filterFunction;
56+
this.executeFunction = executeFunction;
57+
}
58+
59+
@Override
60+
public Flux<Result> apply(Connection connection) {
61+
String sql = this.sqlSupplier.get();
62+
Assert.state(StringUtils.hasText(sql), "SQL returned by supplier must not be empty");
63+
this.resolvedSql = sql;
64+
Statement statement = this.statementFunction.apply(connection, sql);
65+
return Flux.from(this.filterFunction.filter(statement, this.executeFunction))
66+
.cast(Result.class).checkpoint("SQL \"" + sql + "\" [DatabaseClient]");
67+
}
68+
69+
@Nullable
70+
@Override
71+
public String getSql() {
72+
return this.resolvedSql;
73+
}
74+
}

spring-r2dbc/src/test/java/org/springframework/r2dbc/core/DefaultDatabaseClientUnitTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.r2dbc.core;
1818

1919
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.concurrent.atomic.AtomicInteger;
2022

2123
import io.r2dbc.spi.Connection;
2224
import io.r2dbc.spi.ConnectionFactory;
@@ -63,6 +65,7 @@
6365
* @author Mark Paluch
6466
* @author Ferdinand Jacobs
6567
* @author Jens Schauder
68+
* @author Simon Baslé
6669
*/
6770
@MockitoSettings(strictness = Strictness.LENIENT)
6871
class DefaultDatabaseClientUnitTests {
@@ -385,6 +388,47 @@ void shouldApplySimpleStatementFilterFunctions() {
385388
inOrder.verifyNoMoreInteractions();
386389
}
387390

391+
@Test
392+
void sqlSupplierInvocationIsDeferredUntilSubscription() {
393+
// We'll have either 2 or 3 rows, depending on the subscription and the generated SQL
394+
MockRowMetadata metadata = MockRowMetadata.builder().columnMetadata(
395+
MockColumnMetadata.builder().name("id").javaType(Integer.class).build()).build();
396+
final MockRow row1 = MockRow.builder().metadata(metadata)
397+
.identified("id", Integer.class, 1).build();
398+
final MockRow row2 = MockRow.builder().metadata(metadata)
399+
.identified("id", Integer.class, 2).build();
400+
final MockRow row3 = MockRow.builder().metadata(metadata)
401+
.identified("id", Integer.class, 3).build();
402+
// Set up 2 mock statements
403+
mockStatementFor("SELECT id FROM test WHERE id < '3'", MockResult.builder()
404+
.row(row1, row2).build());
405+
mockStatementFor("SELECT id FROM test WHERE id < '4'", MockResult.builder()
406+
.row(row1, row2, row3).build());
407+
// Create the client
408+
DatabaseClient databaseClient = this.databaseClientBuilder.build();
409+
410+
AtomicInteger invoked = new AtomicInteger();
411+
// Assemble a publisher, but don't subscribe yet
412+
Mono<List<Integer>> operation = databaseClient
413+
.sql(() -> {
414+
int idMax = 2 + invoked.incrementAndGet();
415+
return String.format("SELECT id FROM test WHERE id < '%s'", idMax);
416+
})
417+
.map(r -> r.get("id", Integer.class))
418+
.all()
419+
.collectList();
420+
421+
assertThat(invoked).as("invoked (before subscription)").hasValue(0);
422+
423+
List<Integer> rows = operation.block();
424+
assertThat(invoked).as("invoked (after 1st subscription)").hasValue(1);
425+
assertThat(rows).containsExactly(1, 2);
426+
427+
rows = operation.block();
428+
assertThat(invoked).as("invoked (after 2nd subscription)").hasValue(2);
429+
assertThat(rows).containsExactly(1, 2, 3);
430+
}
431+
388432

389433
private Statement mockStatement() {
390434
return mockStatementFor(null, null);

0 commit comments

Comments
 (0)