Skip to content

MultipartHttpMessageWriter fails when Publisher is provided as Multipart [SPR-16406] #20952

Closed
@spring-projects-issues

Description

@spring-projects-issues

Marc-Christian Schulze opened SPR-16406 and commented

When using a Publisher to provide asynchronously a multipart the MultipartHttpMessageWriter fails to write the content with an exception:

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.core.codec.CodecException: No suitable writer found for part: file
Caused by: org.springframework.core.codec.CodecException: No suitable writer found for part: file
  at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.encodePart(MultipartHttpMessageWriter.java:276) ~[spring-web-5.0.3.BUILD-SNAPSHOT.jar:5.0.3.BUILD-SNAPSHOT]
  at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.lambda$encodePartValues$3(MultipartHttpMessageWriter.java:226) ~[spring-web-5.0.3.BUILD-SNAPSHOT.jar:5.0.3.BUILD-SNAPSHOT]
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_151]
  at java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedList.java:1235) ~[na:1.8.0_151]
  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[na:1.8.0_151]
  at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_151]
  at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[na:1.8.0_151]
  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_151]
  at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[na:1.8.0_151]
  at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.encodePartValues(MultipartHttpMessageWriter.java:226) ~[spring-web-5.0.3.BUILD-SNAPSHOT.jar:5.0.3.BUILD-SNAPSHOT]
  at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.lambda$writeMultipart$2(MultipartHttpMessageWriter.java:210) ~[spring-web-5.0.3.BUILD-SNAPSHOT.jar:5.0.3.BUILD-SNAPSHOT]
  at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:357) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.Flux.subscribe(Flux.java:6633) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.Flux.subscribe(Flux.java:6633) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461) ~[reactor-netty-0.7.3.BUILD-SNAPSHOT.jar:0.7.3.BUILD-SNAPSHOT]
  at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191) ~[reactor-netty-0.7.3.BUILD-SNAPSHOT.jar:0.7.3.BUILD-SNAPSHOT]
  at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.19.Final.jar:4.1.19.Final]
  at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-transport-4.1.19.Final.jar:4.1.19.Final]
  at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-transport-4.1.19.Final.jar:4.1.19.Final]
  at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-transport-4.1.19.Final.jar:4.1.19.Final]
  at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) ~[netty-transport-4.1.19.Final.jar:4.1.19.Final]
  at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) ~[netty-transport-4.1.19.Final.jar:4.1.19.Final]
  at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) ~[netty-transport-4.1.19.Final.jar:4.1.19.Final]
  at reactor.ipc.netty.NettyOutbound.lambda$sendObject$6(NettyOutbound.java:298) ~[reactor-netty-0.7.3.BUILD-SNAPSHOT.jar:0.7.3.BUILD-SNAPSHOT]
  at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:106) ~[reactor-netty-0.7.3.BUILD-SNAPSHOT.jar:0.7.3.BUILD-SNAPSHOT]
  at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.Mono.subscribe(Mono.java:3006) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.MonoSourceFlux.subscribe(MonoSourceFlux.java:47) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.Mono.subscribe(Mono.java:3006) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:172) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
  at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:390) ~[reactor-netty-0.7.3.BUILD-SNAPSHOT.jar:0.7.3.BUILD-SNAPSHOT]
  at reactor.ipc.netty.http.client.HttpClientOperations.onHandlerStart(HttpClientOperations.java:479) ~[reactor-netty-0.7.3.BUILD-SNAPSHOT.jar:0.7.3.BUILD-SNAPSHOT]
  at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.19.Final.jar:4.1.19.Final]
  at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-common-4.1.19.Final.jar:4.1.19.Final]
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) [netty-transport-4.1.19.Final.jar:4.1.19.Final]
  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.19.Final.jar:4.1.19.Final]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

I attached two test cases, a positive and a negative one.
The generic test:

private void runTest(Publisher<String> publisher)
      throws InterruptedException, ExecutionException, TimeoutException {
    CompletableFuture<String> f = new CompletableFuture<>();

    MultipartBodyBuilder builder = new MultipartBodyBuilder();
    builder.asyncPart("file", publisher, String.class).headers(h -> {
      h.setContentDispositionFormData("file", "somefile.txt");
    });

    WebClient.create().post() //
        .uri(getURI()) //
        .body(BodyInserters.fromMultipartData(builder.build())) //
        .retrieve() //
        .bodyToMono(String.class) //
        .subscribe(filename -> {
          f.complete(filename);
        });

    String filename = f.get(5, TimeUnit.SECONDS);
    assertEquals("somefile.txt", filename);
  }

The positive one that is working fine:

  @Test
  public void positiveTest() throws InterruptedException, ExecutionException, TimeoutException {
    Publisher<String> publisher = Mono.just("someString");

    runTest(publisher);
  }

The negative Test that fails with the above-mentioned exception:

  @Test
  public void negativeTest() throws InterruptedException, ExecutionException, TimeoutException {
    Publisher<String> publisher = new Publisher<String>() {

      @Override
      public void subscribe(Subscriber<? super String> subscriber) {
        subscriber.onNext("someString");
        subscriber.onComplete();
      }
    };

    runTest(publisher);
  }

See also #20922.


Affects: 5.0.2

Attachments:

Issue Links:

Metadata

Metadata

Assignees

Labels

in: webIssues in web modules (web, webmvc, webflux, websocket)status: invalidAn issue that we don't feel is valid

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions