|
29 | 29 | import org.springframework.core.ParameterizedTypeReference;
|
30 | 30 | import org.springframework.core.ResolvableType;
|
31 | 31 | import org.springframework.core.io.buffer.DataBuffer;
|
32 |
| -import org.springframework.http.HttpMessage; |
33 | 32 | import org.springframework.http.MediaType;
|
34 | 33 | import org.springframework.http.ReactiveHttpInputMessage;
|
35 | 34 | import org.springframework.http.codec.HttpMessageReader;
|
36 | 35 | import org.springframework.http.codec.multipart.Part;
|
37 | 36 | import org.springframework.http.server.reactive.ServerHttpRequest;
|
38 |
| -import org.springframework.http.server.reactive.ServerHttpResponse; |
39 | 37 | import org.springframework.util.MultiValueMap;
|
40 | 38 |
|
41 | 39 | /**
|
42 |
| - * Implementations of {@link BodyExtractor} that read various bodies, such a reactive streams. |
| 40 | + * Static factory methods for {@link BodyExtractor} implementations. |
43 | 41 | *
|
44 | 42 | * @author Arjen Poutsma
|
45 | 43 | * @author Sebastien Deleuze
|
| 44 | + * @author Rossen Stoyanchev |
46 | 45 | * @since 5.0
|
47 | 46 | */
|
48 | 47 | public abstract class BodyExtractors {
|
49 | 48 |
|
50 |
| - private static final ResolvableType FORM_MAP_TYPE = |
| 49 | + private static final ResolvableType FORM_DATA_TYPE = |
51 | 50 | ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class);
|
52 | 51 |
|
53 |
| - private static final ResolvableType MULTIPART_MAP_TYPE = ResolvableType.forClassWithGenerics( |
| 52 | + private static final ResolvableType MULTIPART_DATA_TYPE = ResolvableType.forClassWithGenerics( |
54 | 53 | MultiValueMap.class, String.class, Part.class);
|
55 | 54 |
|
56 | 55 | private static final ResolvableType PART_TYPE = ResolvableType.forClass(Part.class);
|
57 | 56 |
|
58 | 57 | private static final ResolvableType VOID_TYPE = ResolvableType.forClass(Void.class);
|
59 | 58 |
|
| 59 | + |
60 | 60 | /**
|
61 |
| - * Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}. |
62 |
| - * @param elementClass the class of element in the {@code Mono} |
63 |
| - * @param <T> the element type |
64 |
| - * @return a {@code BodyExtractor} that reads a mono |
| 61 | + * Extractor to decode the input content into {@code Mono<T>}. |
| 62 | + * @param elementClass the class of the element type to decode to |
| 63 | + * @param <T> the element type to decode to |
| 64 | + * @return {@code BodyExtractor} for {@code Mono<T>} |
65 | 65 | */
|
66 | 66 | public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(Class<? extends T> elementClass) {
|
67 | 67 | return toMono(ResolvableType.forClass(elementClass));
|
68 | 68 | }
|
69 | 69 |
|
70 | 70 | /**
|
71 |
| - * Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}. |
72 |
| - * The given {@link ParameterizedTypeReference} is used to pass generic type information, for |
73 |
| - * instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient} |
74 |
| - * <pre class="code"> |
75 |
| - * Mono<Map<String, String>> body = this.webClient |
76 |
| - * .get() |
77 |
| - * .uri("http://example.com") |
78 |
| - * .exchange() |
79 |
| - * .flatMap(r -> r.body(toMono(new ParameterizedTypeReference<Map<String,String>>() {}))); |
80 |
| - * </pre> |
81 |
| - * @param typeReference a reference to the type of element in the {@code Mono} |
82 |
| - * @param <T> the element type |
83 |
| - * @return a {@code BodyExtractor} that reads a mono |
| 71 | + * Variant of {@link #toMono(Class)} for type information with generics. |
| 72 | + * @param typeRef the type reference for the type to decode to |
| 73 | + * @param <T> the element type to decode to |
| 74 | + * @return {@code BodyExtractor} for {@code Mono<T>} |
84 | 75 | */
|
85 |
| - public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono( |
86 |
| - ParameterizedTypeReference<T> typeReference) { |
87 |
| - |
88 |
| - return toMono(ResolvableType.forType(typeReference.getType())); |
| 76 | + public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ParameterizedTypeReference<T> typeRef) { |
| 77 | + return toMono(ResolvableType.forType(typeRef.getType())); |
89 | 78 | }
|
90 | 79 |
|
91 |
| - static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) { |
92 |
| - return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, |
93 |
| - elementType, |
94 |
| - (HttpMessageReader<T> reader) -> { |
95 |
| - Optional<ServerHttpResponse> serverResponse = context.serverResponse(); |
96 |
| - if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { |
97 |
| - return reader.readMono(elementType, elementType, (ServerHttpRequest) inputMessage, |
98 |
| - serverResponse.get(), context.hints()); |
99 |
| - } |
100 |
| - else { |
101 |
| - return reader.readMono(elementType, inputMessage, context.hints()); |
102 |
| - } |
103 |
| - }, |
104 |
| - ex -> (inputMessage.getHeaders().getContentType() == null) ? |
105 |
| - Mono.from(permitEmptyOrFail(inputMessage, ex)) : Mono.error(ex), |
106 |
| - Mono::empty); |
| 80 | + private static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) { |
| 81 | + return (inputMessage, context) -> |
| 82 | + readWithMessageReaders(inputMessage, context, elementType, |
| 83 | + (HttpMessageReader<T> reader) -> readToMono(inputMessage, context, elementType, reader), |
| 84 | + ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)), |
| 85 | + Mono::empty); |
107 | 86 | }
|
108 | 87 |
|
109 | 88 | /**
|
110 |
| - * Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}. |
111 |
| - * @param elementClass the class of element in the {@code Flux} |
112 |
| - * @param <T> the element type |
113 |
| - * @return a {@code BodyExtractor} that reads a flux |
| 89 | + * Extractor to decode the input content into {@code Flux<T>}. |
| 90 | + * @param elementClass the class of the element type to decode to |
| 91 | + * @param <T> the element type to decode to |
| 92 | + * @return {@code BodyExtractor} for {@code Flux<T>} |
114 | 93 | */
|
115 | 94 | public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(Class<? extends T> elementClass) {
|
116 | 95 | return toFlux(ResolvableType.forClass(elementClass));
|
117 | 96 | }
|
118 | 97 |
|
119 | 98 | /**
|
120 |
| - * Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}. |
121 |
| - * The given {@link ParameterizedTypeReference} is used to pass generic type information, for |
122 |
| - * instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient} |
123 |
| - * <pre class="code"> |
124 |
| - * Flux<ServerSentEvent<String>> body = this.webClient |
125 |
| - * .get() |
126 |
| - * .uri("http://example.com") |
127 |
| - * .exchange() |
128 |
| - * .flatMap(r -> r.body(toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {}))); |
129 |
| - * </pre> |
130 |
| - * @param typeReference a reference to the type of element in the {@code Flux} |
131 |
| - * @param <T> the element type |
132 |
| - * @return a {@code BodyExtractor} that reads a flux |
| 99 | + * Variant of {@link #toFlux(Class)} for type information with generics. |
| 100 | + * @param typeRef the type reference for the type to decode to |
| 101 | + * @param <T> the element type to decode to |
| 102 | + * @return {@code BodyExtractor} for {@code Flux<T>} |
133 | 103 | */
|
134 |
| - public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux( |
135 |
| - ParameterizedTypeReference<T> typeReference) { |
136 |
| - |
137 |
| - return toFlux(ResolvableType.forType(typeReference.getType())); |
| 104 | + public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ParameterizedTypeReference<T> typeRef) { |
| 105 | + return toFlux(ResolvableType.forType(typeRef.getType())); |
138 | 106 | }
|
139 | 107 |
|
140 | 108 | @SuppressWarnings("unchecked")
|
141 |
| - static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) { |
142 |
| - return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, |
143 |
| - elementType, |
144 |
| - (HttpMessageReader<T> reader) -> { |
145 |
| - Optional<ServerHttpResponse> serverResponse = context.serverResponse(); |
146 |
| - if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) { |
147 |
| - return reader.read(elementType, elementType, (ServerHttpRequest) inputMessage, |
148 |
| - serverResponse.get(), context.hints()); |
149 |
| - } |
150 |
| - else { |
151 |
| - return reader.read(elementType, inputMessage, context.hints()); |
152 |
| - } |
153 |
| - }, |
154 |
| - ex -> (inputMessage.getHeaders().getContentType() == null) ? |
155 |
| - permitEmptyOrFail(inputMessage, ex) : Flux.error(ex), |
156 |
| - Flux::empty); |
| 109 | + private static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) { |
| 110 | + return (inputMessage, context) -> |
| 111 | + readWithMessageReaders(inputMessage, context, elementType, |
| 112 | + (HttpMessageReader<T> reader) -> readToFlux(inputMessage, context, elementType, reader), |
| 113 | + ex -> unsupportedErrorHandler(inputMessage, ex), |
| 114 | + Flux::empty); |
157 | 115 | }
|
158 | 116 |
|
159 |
| - @SuppressWarnings("unchecked") |
160 |
| - private static <T> Flux<T> permitEmptyOrFail(ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) { |
161 |
| - return message.getBody().doOnNext(buffer -> { |
162 |
| - throw ex; |
163 |
| - }).map(o -> (T) o); |
164 |
| - } |
| 117 | + |
| 118 | + // Extractors for specific content .. |
165 | 119 |
|
166 | 120 | /**
|
167 |
| - * Return a {@code BodyExtractor} that reads form data into a {@link MultiValueMap}. |
| 121 | + * Extractor to read form data into {@code MultiValueMap<String, String>}. |
168 | 122 | * <p>As of 5.1 this method can also be used on the client side to read form
|
169 | 123 | * data from a server response (e.g. OAuth).
|
170 |
| - * @return a {@code BodyExtractor} that reads form data |
| 124 | + * @return {@code BodyExtractor} for form data |
171 | 125 | */
|
172 | 126 | public static BodyExtractor<Mono<MultiValueMap<String, String>>, ReactiveHttpInputMessage> toFormData() {
|
173 | 127 | return (message, context) -> {
|
174 |
| - ResolvableType type = FORM_MAP_TYPE; |
175 |
| - HttpMessageReader<MultiValueMap<String, String>> reader = |
176 |
| - messageReader(type, MediaType.APPLICATION_FORM_URLENCODED, context); |
177 |
| - Optional<ServerHttpResponse> response = context.serverResponse(); |
178 |
| - if (response.isPresent() && message instanceof ServerHttpRequest) { |
179 |
| - return reader.readMono(type, type, (ServerHttpRequest) message, response.get(), context.hints()); |
180 |
| - } |
181 |
| - else { |
182 |
| - return reader.readMono(type, message, context.hints()); |
183 |
| - } |
| 128 | + ResolvableType elementType = FORM_DATA_TYPE; |
| 129 | + MediaType mediaType = MediaType.APPLICATION_FORM_URLENCODED; |
| 130 | + HttpMessageReader<MultiValueMap<String, String>> reader = findReader(elementType, mediaType, context); |
| 131 | + return readToMono(message, context, elementType, reader); |
184 | 132 | };
|
185 | 133 | }
|
186 | 134 |
|
187 | 135 | /**
|
188 |
| - * Return a {@code BodyExtractor} that reads multipart (i.e. file upload) form data into a |
189 |
| - * {@link MultiValueMap}. |
190 |
| - * @return a {@code BodyExtractor} that reads multipart data |
| 136 | + * Extractor to read multipart data into a {@code MultiValueMap<String, Part>}. |
| 137 | + * @return {@code BodyExtractor} for multipart data |
191 | 138 | */
|
192 |
| - // Note that the returned BodyExtractor is parameterized to ServerHttpRequest, not |
193 |
| - // ReactiveHttpInputMessage like other methods, since reading form data only typically happens on |
194 |
| - // the server-side |
| 139 | + // Parameterized for server-side use |
195 | 140 | public static BodyExtractor<Mono<MultiValueMap<String, Part>>, ServerHttpRequest> toMultipartData() {
|
196 | 141 | return (serverRequest, context) -> {
|
197 |
| - ResolvableType type = MULTIPART_MAP_TYPE; |
198 |
| - HttpMessageReader<MultiValueMap<String, Part>> reader = |
199 |
| - messageReader(type, MediaType.MULTIPART_FORM_DATA, context); |
200 |
| - return context.serverResponse() |
201 |
| - .map(response -> reader.readMono(type, type, serverRequest, response, context.hints())) |
202 |
| - .orElseGet(() -> reader.readMono(type, serverRequest, context.hints())); |
| 142 | + ResolvableType elementType = MULTIPART_DATA_TYPE; |
| 143 | + MediaType mediaType = MediaType.MULTIPART_FORM_DATA; |
| 144 | + HttpMessageReader<MultiValueMap<String, Part>> reader = findReader(elementType, mediaType, context); |
| 145 | + return readToMono(serverRequest, context, elementType, reader); |
203 | 146 | };
|
204 | 147 | }
|
205 | 148 |
|
206 | 149 | /**
|
207 |
| - * Return a {@code BodyExtractor} that reads multipart (i.e. file upload) form data into a |
208 |
| - * {@link MultiValueMap}. |
209 |
| - * @return a {@code BodyExtractor} that reads multipart data |
| 150 | + * Extractor to read multipart data into {@code Flux<Part>}. |
| 151 | + * @return {@code BodyExtractor} for multipart request parts |
210 | 152 | */
|
211 |
| - // Note that the returned BodyExtractor is parameterized to ServerHttpRequest, not |
212 |
| - // ReactiveHttpInputMessage like other methods, since reading form data only typically happens on |
213 |
| - // the server-side |
| 153 | + // Parameterized for server-side use |
214 | 154 | public static BodyExtractor<Flux<Part>, ServerHttpRequest> toParts() {
|
215 | 155 | return (serverRequest, context) -> {
|
216 |
| - ResolvableType type = PART_TYPE; |
217 |
| - HttpMessageReader<Part> reader = messageReader(type, MediaType.MULTIPART_FORM_DATA, context); |
218 |
| - return context.serverResponse() |
219 |
| - .map(response -> reader.read(type, type, serverRequest, response, context.hints())) |
220 |
| - .orElseGet(() -> reader.read(type, serverRequest, context.hints())); |
| 156 | + ResolvableType elementType = PART_TYPE; |
| 157 | + MediaType mediaType = MediaType.MULTIPART_FORM_DATA; |
| 158 | + HttpMessageReader<Part> reader = findReader(elementType, mediaType, context); |
| 159 | + return readToFlux(serverRequest, context, elementType, reader); |
221 | 160 | };
|
222 | 161 | }
|
223 | 162 |
|
224 | 163 | /**
|
225 |
| - * Return a {@code BodyExtractor} that returns the body of the message as a {@link Flux} of |
226 |
| - * {@link DataBuffer}s. |
227 |
| - * <p><strong>Note</strong> that the returned buffers should be released after usage by calling |
228 |
| - * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)} |
229 |
| - * @return a {@code BodyExtractor} that returns the body |
230 |
| - * @see ReactiveHttpInputMessage#getBody() |
| 164 | + * Extractor that returns the raw {@link DataBuffer}s. |
| 165 | + * <p><strong>Note:</strong> the data buffers should be |
| 166 | + * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) |
| 167 | + * released} after being used. |
| 168 | + * @return {@code BodyExtractor} for data buffers |
231 | 169 | */
|
232 | 170 | public static BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> toDataBuffers() {
|
233 | 171 | return (inputMessage, context) -> inputMessage.getBody();
|
234 | 172 | }
|
235 | 173 |
|
236 | 174 |
|
| 175 | + // Private support methods |
| 176 | + |
237 | 177 | private static <T, S extends Publisher<T>> S readWithMessageReaders(
|
238 |
| - ReactiveHttpInputMessage inputMessage, BodyExtractor.Context context, ResolvableType elementType, |
| 178 | + ReactiveHttpInputMessage message, BodyExtractor.Context context, ResolvableType elementType, |
239 | 179 | Function<HttpMessageReader<T>, S> readerFunction,
|
240 |
| - Function<UnsupportedMediaTypeException, S> unsupportedError, |
241 |
| - Supplier<S> empty) { |
| 180 | + Function<UnsupportedMediaTypeException, S> errorFunction, |
| 181 | + Supplier<S> emptySupplier) { |
242 | 182 |
|
243 | 183 | if (VOID_TYPE.equals(elementType)) {
|
244 |
| - return empty.get(); |
| 184 | + return emptySupplier.get(); |
245 | 185 | }
|
246 |
| - MediaType contentType = contentType(inputMessage); |
247 |
| - List<HttpMessageReader<?>> messageReaders = context.messageReaders(); |
248 |
| - return messageReaders.stream() |
249 |
| - .filter(r -> r.canRead(elementType, contentType)) |
| 186 | + |
| 187 | + MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType()) |
| 188 | + .orElse(MediaType.APPLICATION_OCTET_STREAM); |
| 189 | + |
| 190 | + return context.messageReaders().stream() |
| 191 | + .filter(reader -> reader.canRead(elementType, contentType)) |
250 | 192 | .findFirst()
|
251 | 193 | .map(BodyExtractors::<T>cast)
|
252 | 194 | .map(readerFunction)
|
253 |
| - .orElseGet(() -> { |
254 |
| - List<MediaType> supportedMediaTypes = messageReaders.stream() |
255 |
| - .flatMap(reader -> reader.getReadableMediaTypes().stream()) |
256 |
| - .collect(Collectors.toList()); |
257 |
| - UnsupportedMediaTypeException error = |
258 |
| - new UnsupportedMediaTypeException(contentType, supportedMediaTypes, elementType); |
259 |
| - return unsupportedError.apply(error); |
260 |
| - }); |
| 195 | + .orElseGet(() -> errorFunction.apply(unsupportedError(context, elementType, contentType))); |
| 196 | + } |
| 197 | + |
| 198 | + private static UnsupportedMediaTypeException unsupportedError(BodyExtractor.Context context, |
| 199 | + ResolvableType elementType, MediaType contentType) { |
| 200 | + |
| 201 | + List<MediaType> supportedMediaTypes = context.messageReaders().stream() |
| 202 | + .flatMap(reader -> reader.getReadableMediaTypes().stream()) |
| 203 | + .collect(Collectors.toList()); |
| 204 | + |
| 205 | + return new UnsupportedMediaTypeException(contentType, supportedMediaTypes, elementType); |
261 | 206 | }
|
262 | 207 |
|
263 |
| - private static <T> HttpMessageReader<T> messageReader(ResolvableType elementType, |
264 |
| - MediaType mediaType, BodyExtractor.Context context) { |
| 208 | + private static <T> Mono<T> readToMono(ReactiveHttpInputMessage message, BodyExtractor.Context context, |
| 209 | + ResolvableType type, HttpMessageReader<T> reader) { |
| 210 | + |
| 211 | + return context.serverResponse() |
| 212 | + .map(response -> reader.readMono(type, type, (ServerHttpRequest) message, response, context.hints())) |
| 213 | + .orElseGet(() -> reader.readMono(type, message, context.hints())); |
| 214 | + } |
| 215 | + |
| 216 | + private static <T> Flux<T> readToFlux(ReactiveHttpInputMessage message, BodyExtractor.Context context, |
| 217 | + ResolvableType type, HttpMessageReader<T> reader) { |
| 218 | + |
| 219 | + return context.serverResponse() |
| 220 | + .map(response -> reader.read(type, type, (ServerHttpRequest) message, response, context.hints())) |
| 221 | + .orElseGet(() -> reader.read(type, message, context.hints())); |
| 222 | + } |
| 223 | + |
| 224 | + private static <T> Flux<T> unsupportedErrorHandler( |
| 225 | + ReactiveHttpInputMessage inputMessage, UnsupportedMediaTypeException ex) { |
| 226 | + |
| 227 | + if (inputMessage.getHeaders().getContentType() == null) { |
| 228 | + // Empty body with no content type is ok |
| 229 | + return inputMessage.getBody().map(o -> { |
| 230 | + throw ex; |
| 231 | + }); |
| 232 | + } |
| 233 | + else { |
| 234 | + return Flux.error(ex); |
| 235 | + } |
| 236 | + } |
| 237 | + |
| 238 | + private static <T> HttpMessageReader<T> findReader( |
| 239 | + ResolvableType elementType, MediaType mediaType, BodyExtractor.Context context) { |
| 240 | + |
265 | 241 | return context.messageReaders().stream()
|
266 | 242 | .filter(messageReader -> messageReader.canRead(elementType, mediaType))
|
267 | 243 | .findFirst()
|
268 | 244 | .map(BodyExtractors::<T>cast)
|
269 | 245 | .orElseThrow(() -> new IllegalStateException(
|
270 |
| - "Could not find HttpMessageReader that supports \"" + mediaType + |
271 |
| - "\" and \"" + elementType + "\"")); |
272 |
| - } |
273 |
| - |
274 |
| - private static MediaType contentType(HttpMessage message) { |
275 |
| - MediaType result = message.getHeaders().getContentType(); |
276 |
| - return result != null ? result : MediaType.APPLICATION_OCTET_STREAM; |
| 246 | + "No HttpMessageReader for \"" + mediaType + "\" and \"" + elementType + "\"")); |
277 | 247 | }
|
278 | 248 |
|
279 | 249 | @SuppressWarnings("unchecked")
|
280 |
| - private static <T> HttpMessageReader<T> cast(HttpMessageReader<?> messageReader) { |
281 |
| - return (HttpMessageReader<T>) messageReader; |
| 250 | + private static <T> HttpMessageReader<T> cast(HttpMessageReader<?> reader) { |
| 251 | + return (HttpMessageReader<T>) reader; |
282 | 252 | }
|
283 | 253 |
|
284 | 254 | }
|
0 commit comments