Skip to content

MultipartHttpMessageWriter should not subscribe to Publisher multipart data [SPR-16402] #20948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
spring-projects-issues opened this issue Jan 19, 2018 · 6 comments
Assignees
Labels
type: bug A general bug
Milestone

Comments

@spring-projects-issues
Copy link
Collaborator

spring-projects-issues commented Jan 19, 2018

Arjen Poutsma opened SPR-16402 and commented

The MultipartHttpMessageWriter should not subscribe to the partWritten mono, as that can result in exceptions when the data written comes out of a publisher itself, and when that publisher can only be subscribed to once.

For instance, when the data is coming from a Channel, and that channel closes after the first subscription, the second time the publisher is subscribed to results in IO exceptions.


Affects: 5.0.2

Reference URL: https://github.com/rstoyanchev/spring-framework/blob/6c3a64578c10dd4e6fed933864f1721cf35203ae/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java#L278

Issue Links:

Referenced from: commits 09f1f72, afd248d

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Indeed that .subscribe() was there to ensure the part writer has done the write (which may be nested and hence not immediately available), so we can then get the part content. However effectively that meant the content was consumed twice.

@spring-projects-issues
Copy link
Collaborator Author

Arjen Poutsma commented

Not sure if this fix is enough, as the following still fails:

@Test
    public void spr16402() {
        UnicastProcessor<String> processor = UnicastProcessor.create(); // only allows for single subscription
        Flux.just("foo", "bar", "baz").subscribe(processor);

        MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
        bodyBuilder.asyncPart("name", processor, String.class);

        Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());

        MockServerHttpResponse response = new MockServerHttpResponse();
        Map<String, Object> hints = Collections.emptyMap();
        this.writer.write(result, null, MediaType.MULTIPART_FORM_DATA, response, hints).block();
    }

@spring-projects-issues
Copy link
Collaborator Author

Marc-Christian Schulze commented

I rebuild my code using the latest 5.0.3 build 20 mins ago but the code still doesn't work.
My use case of the Publisher looks like:

// once I receive the client's request
AsyncWebRequest req = startRequest("receivedFile.txt");

// ... time passes on until I receive parts of the actual content
req.write(theRecentlyReceivedContent);

// ... time passes on until I receive parts of the actual content
req.write(theRecentlyReceivedContent);

// once I got the last buffer I can close the request which should flush the pending request
req.write(theRecentlyReceivedContent);
req.close();
WebResult result = req.getWebResult();
public AsyncWebRequest startRequest(String fileName) {
  AsyncWebRequest requestHandle = new AsyncWebRequest();

  MultipartBodyBuilder builder = new MultipartBodyBuilder();
  builder.asyncPart("file", requestHandle, ByteBuffer.class).header("filename", fileName);

  WebClient 
    .create(myUrl) 
    .post() 
    .uri("/someUri") 
    .body(BodyInserters.fromMultipartData(builder.build())) 
    .retrieve() 
    .bodyToFlux(WebResult.class) 
    .subscribe(requestHandle);
  
  return requestHandle;
}
class AsyncWebRequest implements Publisher<ByteBuffer>, WritableByteChannel, Consumer<WebResult> {
  private WebResult result;
  private Subscriber<? super ByteBuffer> subscriber;

  @Override
  public WebResult getWebResult() {
    return result;
  }

  @Override
  public void close() throws IOException {
    subscriber.onComplete();
    subscriber = null;
  }

  @Override
  public WritableByteChannel getChannel() {
    return this;
  }

  @Override
  public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
    // also the API says this method can be called multiple times we expect
    // it to be called only once
    this.subscriber = subscriber;
  }

  @Override
  public boolean isOpen() {
    return subscriber != null;
  }

  @Override
  public int write(ByteBuffer src) throws IOException {
    subscriber.onNext(src);
    return src.limit();
  }

  @Override
  public void accept(WebResult result) {
    this.result = result;
  }
}
class WebResult {
  // ... any fields
}

So, the idea is to create in the scope of an async client request a pending async. web request and provide the content via write(ByteBuffer) at the point in time the client provides the content and close and thereby flush the request. It's comparable to a ProxyServer which does not want to keep the entire file upload in-memory but wants to asynchronously do the IO dispatching.

@spring-projects-issues
Copy link
Collaborator Author

spring-projects-issues commented Jan 20, 2018

Rossen Stoyanchev commented

Arjen Poutsma this should be fixed now. I've added a couple of tests based on the one you have above. Note that I made one extra fix for a problem I noticed with writing the filename. I linked that one to #20922.

Marc-Christian Schulze I can't guarantee that this fixes all of your issues. I think Arjen has some extra fixes under #20924 but we should be pretty close now. The issue described in this ticket should be addressed now.

@spring-projects-issues
Copy link
Collaborator Author

Arjen Poutsma commented

Marc-Christian Schulze I looked at the sample code, and I still don't really understand what you are trying to do :). A class that's a publisher, channel, and consumer?

One thing is for sure: AsyncWebRequest implements Publisher, but does not follow the contract as laid out in the reactive streams specification. For instance, it does not call onSubscribe on the Subscriber, and it calls onNext without checking if actually is any demand for the element. In general, writing your own implementation of the interfaces defined in the RS spec is a bad idea, as they are a lot more complex than meets the eye. And other RS implementations, like Reactor, do expect strict compliance, or you'll get very strange results.

From your description, I think you are trying to use the output from one web request as the input of another request? If that is so, than this is probably what you're looking for:

String filename = ...

Flux<DataBuffer> requestBody = WebClient.create("http://example.com/")
		.get()
		.retrieve()
		.bodyToFlux(DataBuffer.class);

MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.asyncPart("file", requestBody, DataBuffer.class)
		.headers(h -> h.setContentDispositionFormData("file", filename));
MultiValueMap<String, HttpEntity<?>> body = builder.build();

WebClient.create("http://localhost:8080")
		.post()
		.syncBody(body)
		.exchange()
		... // do whatever with the response                

@pduartee
Copy link

@poutsma Is it necessary to release the DataBuffers when using MultiPartBodyBuilder.asyncPart()? I'm trying to find a way to do it after they are consumed but still, I haven't found a way yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

3 participants