Skip to content

Commit 7d7ed88

Browse files
committed
Provide WebClient#exchange() alternative for Coroutines
This commit adds awaitExchange { } and exchangeToFlow { } extensions as Coroutines variants for exchangeToMono() and exchangeToFlux(). Closes gh-25751
1 parent e899397 commit 7d7ed88

File tree

3 files changed

+70
-5
lines changed

3 files changed

+70
-5
lines changed

spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,13 @@
1616

1717
package org.springframework.web.reactive.function.client
1818

19+
import kotlinx.coroutines.Dispatchers
1920
import kotlinx.coroutines.flow.Flow
2021
import kotlinx.coroutines.reactive.asFlow
22+
import kotlinx.coroutines.reactive.awaitFirst
2123
import kotlinx.coroutines.reactive.awaitSingle
24+
import kotlinx.coroutines.reactor.asFlux
25+
import kotlinx.coroutines.reactor.mono
2226
import org.reactivestreams.Publisher
2327
import org.springframework.core.ParameterizedTypeReference
2428
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec
@@ -70,9 +74,29 @@ inline fun <reified T : Any> RequestBodySpec.body(producer: Any): RequestHeaders
7074
* @since 5.2
7175
*/
7276
@Suppress("DEPRECATION")
77+
@Deprecated("Deprecated since 5.3 due to the possibility to leak memory and/or connections; please," +
78+
"use awaitExchange { } or exchangeToFlow { } instead; consider also using retrieve()" +
79+
"which provides access to the response status and headers via ResponseEntity along with error status handling.")
7380
suspend fun RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(): ClientResponse =
7481
exchange().awaitSingle()
7582

83+
/**
84+
* Coroutines variant of [WebClient.RequestHeadersSpec.exchangeToMono].
85+
*
86+
* @author Sebastien Deleuze
87+
* @since 5.3
88+
*/
89+
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(responseHandler: suspend (ClientResponse) -> T): T =
90+
exchangeToMono { mono(Dispatchers.Unconfined) { responseHandler.invoke(it) } }.awaitFirst()
91+
92+
/**
93+
* Coroutines variant of [WebClient.RequestHeadersSpec.exchangeToFlux].
94+
*
95+
* @author Sebastien Deleuze
96+
* @since 5.3
97+
*/
98+
fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.exchangeToFlow(responseHandler: (ClientResponse) -> Flow<T>): Flow<T> =
99+
exchangeToFlux { responseHandler.invoke(it).asFlux() }.asFlow()
76100

77101
/**
78102
* Extension for [WebClient.ResponseSpec.bodyToMono] providing a `bodyToMono<Foo>()` variant

spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/WebClientExtensionsTests.kt

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,13 +20,17 @@ import io.mockk.every
2020
import io.mockk.mockk
2121
import io.mockk.verify
2222
import kotlinx.coroutines.flow.Flow
23+
import kotlinx.coroutines.flow.flow
24+
import kotlinx.coroutines.flow.toList
2325
import kotlinx.coroutines.runBlocking
2426
import org.assertj.core.api.Assertions.assertThat
2527
import org.junit.jupiter.api.Test
2628
import org.reactivestreams.Publisher
2729
import org.springframework.core.ParameterizedTypeReference
30+
import reactor.core.publisher.Flux
2831
import reactor.core.publisher.Mono
2932
import java.util.concurrent.CompletableFuture
33+
import java.util.function.Function
3034

3135
/**
3236
* Mock object based tests for [WebClient] Kotlin extensions
@@ -89,6 +93,29 @@ class WebClientExtensionsTests {
8993
}
9094
}
9195

96+
@Test
97+
fun `awaitExchange with function parameter`() {
98+
val foo = mockk<Foo>()
99+
every { requestBodySpec.exchangeToMono(any<Function<ClientResponse, Mono<Foo>>>()) } returns Mono.just(foo)
100+
runBlocking {
101+
assertThat(requestBodySpec.awaitExchange { foo }).isEqualTo(foo)
102+
}
103+
}
104+
105+
@Test
106+
fun exchangeToFlow() {
107+
val foo = mockk<Foo>()
108+
every { requestBodySpec.exchangeToFlux(any<Function<ClientResponse, Flux<Foo>>>()) } returns Flux.just(foo, foo)
109+
runBlocking {
110+
assertThat(requestBodySpec.exchangeToFlow {
111+
flow {
112+
emit(foo)
113+
emit(foo)
114+
}
115+
}.toList()).isEqualTo(listOf(foo, foo))
116+
}
117+
}
118+
92119
@Test
93120
fun awaitBody() {
94121
val spec = mockk<WebClient.ResponseSpec>()

src/docs/asciidoc/web/webflux-webclient.adoc

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -537,10 +537,10 @@ responses, use `onStatus` handlers as follows:
537537

538538

539539
[[webflux-client-exchange]]
540-
== `exchangeToMono()`
540+
== Exchange
541541

542-
The `exchangeToMono()` and `exchangeToFlux()` methods are useful for more advanced
543-
cases that require more control, such as to decode the response differently
542+
The `exchangeToMono()` and `exchangeToFlux()` methods (or `awaitExchange { }` and `exchangeToFlow { }` in Kotlin)
543+
are useful for more advanced cases that require more control, such as to decode the response differently
544544
depending on the response status:
545545

546546
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
@@ -564,6 +564,20 @@ depending on the response status:
564564
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
565565
.Kotlin
566566
----
567+
val entity = client.get()
568+
.uri("/persons/1")
569+
.accept(MediaType.APPLICATION_JSON)
570+
.awaitExchange {
571+
if (response.statusCode() == HttpStatus.OK) {
572+
return response.awaitBody<Person>();
573+
}
574+
else if (response.statusCode().is4xxClientError) {
575+
return response.awaitBody<ErrorContainer>();
576+
}
577+
else {
578+
return response.createExceptionAndAwait();
579+
}
580+
}
567581
----
568582

569583
When using the above, after the returned `Mono` or `Flux` completes, the response body

0 commit comments

Comments
 (0)