31
31
import java .util .concurrent .Callable ;
32
32
import java .util .concurrent .atomic .AtomicBoolean ;
33
33
import java .util .concurrent .atomic .AtomicLong ;
34
+ import java .util .concurrent .atomic .AtomicReference ;
34
35
import java .util .function .Consumer ;
35
36
import java .util .function .IntPredicate ;
36
37
@@ -336,6 +337,7 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteC
336
337
sink .next (dataBuffer );
337
338
}
338
339
catch (IOException ex ) {
340
+ sink .next (dataBuffer );
339
341
sink .error (ex );
340
342
}
341
343
@@ -355,6 +357,26 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteC
355
357
* @param channel the channel to write to
356
358
* @return a flux containing the same buffers as in {@code source}, that starts the writing
357
359
* process when subscribed to, and that publishes any writing errors and the completion signal
360
+ * @since 5.1
361
+ */
362
+ public static Flux <DataBuffer > write (
363
+ Publisher <DataBuffer > source , AsynchronousFileChannel channel ) {
364
+ return write (source , channel , 0 );
365
+ }
366
+
367
+
368
+ /**
369
+ * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
370
+ * Does <strong>not</strong> close the channel when the flux is terminated, and does
371
+ * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
372
+ * source. If releasing is required, then subscribe to the returned {@code Flux} with a
373
+ * {@link #releaseConsumer()}.
374
+ * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
375
+ * @param source the stream of data buffers to be written
376
+ * @param channel the channel to write to
377
+ * @param position the file position at which the write is to begin; must be non-negative
378
+ * @return a flux containing the same buffers as in {@code source}, that starts the writing
379
+ * process when subscribed to, and that publishes any writing errors and the completion signal
358
380
*/
359
381
public static Flux <DataBuffer > write (
360
382
Publisher <DataBuffer > source , AsynchronousFileChannel channel , long position ) {
@@ -610,10 +632,11 @@ private static class AsynchronousFileChannelWriteCompletionHandler extends BaseS
610
632
611
633
private final AtomicBoolean completed = new AtomicBoolean ();
612
634
635
+ private final AtomicReference <Throwable > error = new AtomicReference <>();
636
+
613
637
private final AtomicLong position ;
614
638
615
- @ Nullable
616
- private DataBuffer dataBuffer ;
639
+ private final AtomicReference <DataBuffer > dataBuffer = new AtomicReference <>();
617
640
618
641
public AsynchronousFileChannelWriteCompletionHandler (
619
642
FluxSink <DataBuffer > sink , AsynchronousFileChannel channel , long position ) {
@@ -630,21 +653,27 @@ protected void hookOnSubscribe(Subscription subscription) {
630
653
631
654
@ Override
632
655
protected void hookOnNext (DataBuffer value ) {
633
- this .dataBuffer = value ;
656
+ if (!this .dataBuffer .compareAndSet (null , value )) {
657
+ throw new IllegalStateException ();
658
+ }
634
659
ByteBuffer byteBuffer = value .asByteBuffer ();
635
660
this .channel .write (byteBuffer , this .position .get (), byteBuffer , this );
636
661
}
637
662
638
663
@ Override
639
664
protected void hookOnError (Throwable throwable ) {
640
- this .sink .error (throwable );
665
+ this .error .set (throwable );
666
+
667
+ if (this .dataBuffer .get () == null ) {
668
+ this .sink .error (throwable );
669
+ }
641
670
}
642
671
643
672
@ Override
644
673
protected void hookOnComplete () {
645
674
this .completed .set (true );
646
675
647
- if (this .dataBuffer == null ) {
676
+ if (this .dataBuffer . get () == null ) {
648
677
this .sink .complete ();
649
678
}
650
679
}
@@ -656,11 +685,13 @@ public void completed(Integer written, ByteBuffer byteBuffer) {
656
685
this .channel .write (byteBuffer , pos , byteBuffer , this );
657
686
return ;
658
687
}
659
- if (this .dataBuffer != null ) {
660
- this .sink .next (this .dataBuffer );
661
- this .dataBuffer = null ;
688
+ sinkDataBuffer ();
689
+
690
+ Throwable throwable = this .error .get ();
691
+ if (throwable != null ) {
692
+ this .sink .error (throwable );
662
693
}
663
- if (this .completed .get ()) {
694
+ else if (this .completed .get ()) {
664
695
this .sink .complete ();
665
696
}
666
697
else {
@@ -670,8 +701,16 @@ public void completed(Integer written, ByteBuffer byteBuffer) {
670
701
671
702
@ Override
672
703
public void failed (Throwable exc , ByteBuffer byteBuffer ) {
704
+ sinkDataBuffer ();
673
705
this .sink .error (exc );
674
706
}
707
+
708
+ private void sinkDataBuffer () {
709
+ DataBuffer dataBuffer = this .dataBuffer .get ();
710
+ Assert .state (dataBuffer != null , "DataBuffer should not be null" );
711
+ this .sink .next (dataBuffer );
712
+ this .dataBuffer .set (null );
713
+ }
675
714
}
676
715
677
716
/**
0 commit comments