@@ -56,6 +56,7 @@ public abstract class DataBufferUtils {
56
56
57
57
private static final Consumer <DataBuffer > RELEASE_CONSUMER = DataBufferUtils ::release ;
58
58
59
+
59
60
//---------------------------------------------------------------------
60
61
// Reading
61
62
//---------------------------------------------------------------------
@@ -74,31 +75,31 @@ public abstract class DataBufferUtils {
74
75
* {@link #readInputStream(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1
75
76
*/
76
77
@ Deprecated
77
- public static Flux <DataBuffer > read (InputStream inputStream ,
78
- DataBufferFactory dataBufferFactory , int bufferSize ) {
78
+ public static Flux <DataBuffer > read (
79
+ InputStream inputStream , DataBufferFactory dataBufferFactory , int bufferSize ) {
80
+
79
81
return readInputStream (() -> inputStream , dataBufferFactory , bufferSize );
80
82
}
81
83
82
84
/**
83
- * Obtain a {@link InputStream} from the given supplier, and read it into a {@code Flux} of
84
- * {@code DataBuffer}s. Closes the input stream when the flux is terminated.
85
+ * Obtain a {@link InputStream} from the given supplier, and read it into a {@code Flux}
86
+ * of {@code DataBuffer}s. Closes the input stream when the flux is terminated.
85
87
* @param inputStreamSupplier the supplier for the input stream to read from
86
88
* @param dataBufferFactory the factory to create data buffers with
87
89
* @param bufferSize the maximum size of the data buffers
88
90
* @return a flux of data buffers read from the given channel
89
91
*/
90
- public static Flux <DataBuffer > readInputStream (Callable < InputStream > inputStreamSupplier ,
91
- DataBufferFactory dataBufferFactory , int bufferSize ) {
92
+ public static Flux <DataBuffer > readInputStream (
93
+ Callable < InputStream > inputStreamSupplier , DataBufferFactory dataBufferFactory , int bufferSize ) {
92
94
93
95
Assert .notNull (inputStreamSupplier , "'inputStreamSupplier' must not be null" );
94
96
95
- return readByteChannel (() -> Channels .newChannel (inputStreamSupplier .call ()),
96
- dataBufferFactory , bufferSize );
97
+ return readByteChannel (() -> Channels .newChannel (inputStreamSupplier .call ()), dataBufferFactory , bufferSize );
97
98
}
98
99
99
100
/**
100
- * Read the given {@code ReadableByteChannel} into a <strong>read-once</strong> {@code Flux} of
101
- * {@code DataBuffer}s. Closes the channel when the flux is terminated.
101
+ * Read the given {@code ReadableByteChannel} into a <strong>read-once</strong> {@code Flux}
102
+ * of {@code DataBuffer}s. Closes the channel when the flux is terminated.
102
103
* <p>The resulting {@code Flux} can only be subscribed to once. See
103
104
* {@link #readByteChannel(Callable, DataBufferFactory, int)} for a variant that supports
104
105
* multiple subscriptions.
@@ -110,8 +111,9 @@ public static Flux<DataBuffer> readInputStream(Callable<InputStream> inputStream
110
111
* {@link #readByteChannel(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1
111
112
*/
112
113
@ Deprecated
113
- public static Flux <DataBuffer > read (ReadableByteChannel channel ,
114
- DataBufferFactory dataBufferFactory , int bufferSize ) {
114
+ public static Flux <DataBuffer > read (
115
+ ReadableByteChannel channel , DataBufferFactory dataBufferFactory , int bufferSize ) {
116
+
115
117
return readByteChannel (() -> channel , dataBufferFactory , bufferSize );
116
118
}
117
119
@@ -123,8 +125,8 @@ public static Flux<DataBuffer> read(ReadableByteChannel channel,
123
125
* @param bufferSize the maximum size of the data buffers
124
126
* @return a flux of data buffers read from the given channel
125
127
*/
126
- public static Flux <DataBuffer > readByteChannel (Callable < ReadableByteChannel > channelSupplier ,
127
- DataBufferFactory dataBufferFactory , int bufferSize ) {
128
+ public static Flux <DataBuffer > readByteChannel (
129
+ Callable < ReadableByteChannel > channelSupplier , DataBufferFactory dataBufferFactory , int bufferSize ) {
128
130
129
131
Assert .notNull (channelSupplier , "'channelSupplier' must not be null" );
130
132
Assert .notNull (dataBufferFactory , "'dataBufferFactory' must not be null" );
@@ -156,8 +158,9 @@ public static Flux<DataBuffer> readByteChannel(Callable<ReadableByteChannel> cha
156
158
* Spring 5.1
157
159
*/
158
160
@ Deprecated
159
- public static Flux <DataBuffer > read (AsynchronousFileChannel channel ,
160
- DataBufferFactory dataBufferFactory , int bufferSize ) {
161
+ public static Flux <DataBuffer > read (
162
+ AsynchronousFileChannel channel , DataBufferFactory dataBufferFactory , int bufferSize ) {
163
+
161
164
return readAsynchronousFileChannel (() -> channel , dataBufferFactory , bufferSize );
162
165
}
163
166
@@ -178,8 +181,9 @@ public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
178
181
* in Spring 5.1
179
182
*/
180
183
@ Deprecated
181
- public static Flux <DataBuffer > read (AsynchronousFileChannel channel ,
182
- long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
184
+ public static Flux <DataBuffer > read (
185
+ AsynchronousFileChannel channel , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
186
+
183
187
return readAsynchronousFileChannel (() -> channel , position , dataBufferFactory , bufferSize );
184
188
}
185
189
@@ -192,24 +196,22 @@ public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
192
196
* @return a flux of data buffers read from the given channel
193
197
*/
194
198
public static Flux <DataBuffer > readAsynchronousFileChannel (
195
- Callable <AsynchronousFileChannel > channelSupplier ,
196
- DataBufferFactory dataBufferFactory , int bufferSize ) {
199
+ Callable <AsynchronousFileChannel > channelSupplier , DataBufferFactory dataBufferFactory , int bufferSize ) {
197
200
198
201
return readAsynchronousFileChannel (channelSupplier , 0 , dataBufferFactory , bufferSize );
199
202
}
200
203
201
204
/**
202
205
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
203
- * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the channel when
204
- * the flux is terminated.
206
+ * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
207
+ * channel when the flux is terminated.
205
208
* @param channelSupplier the supplier for the channel to read from
206
209
* @param position the position to start reading from
207
210
* @param dataBufferFactory the factory to create data buffers with
208
211
* @param bufferSize the maximum size of the data buffers
209
212
* @return a flux of data buffers read from the given channel
210
213
*/
211
- public static Flux <DataBuffer > readAsynchronousFileChannel (
212
- Callable <AsynchronousFileChannel > channelSupplier ,
214
+ public static Flux <DataBuffer > readAsynchronousFileChannel (Callable <AsynchronousFileChannel > channelSupplier ,
213
215
long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
214
216
215
217
Assert .notNull (channelSupplier , "'channelSupplier' must not be null" );
@@ -242,8 +244,8 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(
242
244
* @param bufferSize the maximum size of the data buffers
243
245
* @return a flux of data buffers read from the given channel
244
246
*/
245
- public static Flux <DataBuffer > read (Resource resource ,
246
- DataBufferFactory dataBufferFactory , int bufferSize ) {
247
+ public static Flux <DataBuffer > read (
248
+ Resource resource , DataBufferFactory dataBufferFactory , int bufferSize ) {
247
249
248
250
return read (resource , 0 , dataBufferFactory , bufferSize );
249
251
}
@@ -262,13 +264,12 @@ public static Flux<DataBuffer> read(Resource resource,
262
264
* @param bufferSize the maximum size of the data buffers
263
265
* @return a flux of data buffers read from the given channel
264
266
*/
265
- public static Flux <DataBuffer > read (Resource resource , long position ,
266
- DataBufferFactory dataBufferFactory , int bufferSize ) {
267
+ public static Flux <DataBuffer > read (
268
+ Resource resource , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
267
269
268
270
try {
269
271
if (resource .isFile ()) {
270
272
File file = resource .getFile ();
271
-
272
273
return readAsynchronousFileChannel (
273
274
() -> AsynchronousFileChannel .open (file .toPath (), StandardOpenOption .READ ),
274
275
position , dataBufferFactory , bufferSize );
@@ -283,8 +284,6 @@ public static Flux<DataBuffer> read(Resource resource, long position,
283
284
}
284
285
285
286
286
-
287
-
288
287
//---------------------------------------------------------------------
289
288
// Writing
290
289
//---------------------------------------------------------------------
@@ -295,16 +294,13 @@ public static Flux<DataBuffer> read(Resource resource, long position,
295
294
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
296
295
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
297
296
* {@link #releaseConsumer()}.
298
- * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
299
- * to.
297
+ * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
300
298
* @param source the stream of data buffers to be written
301
299
* @param outputStream the output stream to write to
302
300
* @return a flux containing the same buffers as in {@code source}, that starts the writing
303
301
* process when subscribed to, and that publishes any writing errors and the completion signal
304
302
*/
305
- public static Flux <DataBuffer > write (Publisher <DataBuffer > source ,
306
- OutputStream outputStream ) {
307
-
303
+ public static Flux <DataBuffer > write (Publisher <DataBuffer > source , OutputStream outputStream ) {
308
304
Assert .notNull (source , "'source' must not be null" );
309
305
Assert .notNull (outputStream , "'outputStream' must not be null" );
310
306
@@ -318,21 +314,17 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source,
318
314
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
319
315
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
320
316
* {@link #releaseConsumer()}.
321
- * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
322
- * to.
317
+ * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
323
318
* @param source the stream of data buffers to be written
324
319
* @param channel the channel to write to
325
320
* @return a flux containing the same buffers as in {@code source}, that starts the writing
326
321
* process when subscribed to, and that publishes any writing errors and the completion signal
327
322
*/
328
- public static Flux <DataBuffer > write (Publisher <DataBuffer > source ,
329
- WritableByteChannel channel ) {
330
-
323
+ public static Flux <DataBuffer > write (Publisher <DataBuffer > source , WritableByteChannel channel ) {
331
324
Assert .notNull (source , "'source' must not be null" );
332
325
Assert .notNull (channel , "'channel' must not be null" );
333
326
334
327
Flux <DataBuffer > flux = Flux .from (source );
335
-
336
328
return Flux .create (sink ->
337
329
flux .subscribe (dataBuffer -> {
338
330
try {
@@ -357,40 +349,36 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source,
357
349
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
358
350
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
359
351
* {@link #releaseConsumer()}.
360
- * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
361
- * to.
352
+ * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
362
353
* @param source the stream of data buffers to be written
363
354
* @param channel the channel to write to
364
355
* @return a flux containing the same buffers as in {@code source}, that starts the writing
365
356
* process when subscribed to, and that publishes any writing errors and the completion signal
366
357
*/
367
- public static Flux <DataBuffer > write (Publisher < DataBuffer > source , AsynchronousFileChannel channel ,
368
- long position ) {
358
+ public static Flux <DataBuffer > write (
359
+ Publisher < DataBuffer > source , AsynchronousFileChannel channel , long position ) {
369
360
370
361
Assert .notNull (source , "'source' must not be null" );
371
362
Assert .notNull (channel , "'channel' must not be null" );
372
363
Assert .isTrue (position >= 0 , "'position' must be >= 0" );
373
364
374
365
Flux <DataBuffer > flux = Flux .from (source );
375
-
376
366
return Flux .create (sink -> {
377
- BaseSubscriber <DataBuffer > subscriber =
378
- new AsynchronousFileChannelWriteCompletionHandler (sink , channel , position );
379
- flux .subscribe (subscriber );
367
+ flux .subscribe (new AsynchronousFileChannelWriteCompletionHandler (sink , channel , position ));
380
368
});
381
369
}
382
370
383
-
384
371
private static void closeChannel (@ Nullable Channel channel ) {
385
- try {
386
- if ( channel != null && channel . isOpen ()) {
372
+ if ( channel != null && channel . isOpen ()) {
373
+ try {
387
374
channel .close ();
388
375
}
389
- }
390
- catch ( IOException ignored ) {
376
+ catch ( IOException ignored ) {
377
+ }
391
378
}
392
379
}
393
380
381
+
394
382
//---------------------------------------------------------------------
395
383
// Various
396
384
//---------------------------------------------------------------------
@@ -484,10 +472,7 @@ public static <T extends DataBuffer> T retain(T dataBuffer) {
484
472
* @return {@code true} if the buffer was released; {@code false} otherwise.
485
473
*/
486
474
public static boolean release (@ Nullable DataBuffer dataBuffer ) {
487
- if (dataBuffer instanceof PooledDataBuffer ) {
488
- return ((PooledDataBuffer ) dataBuffer ).release ();
489
- }
490
- return false ;
475
+ return (dataBuffer instanceof PooledDataBuffer && ((PooledDataBuffer ) dataBuffer ).release ());
491
476
}
492
477
493
478
/**
@@ -520,18 +505,16 @@ public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
520
505
}
521
506
522
507
523
- private static class ReadableByteChannelGenerator
524
- implements Consumer <SynchronousSink <DataBuffer >> {
508
+ private static class ReadableByteChannelGenerator implements Consumer <SynchronousSink <DataBuffer >> {
525
509
526
510
private final ReadableByteChannel channel ;
527
511
528
512
private final DataBufferFactory dataBufferFactory ;
529
513
530
514
private final int bufferSize ;
531
515
532
-
533
- public ReadableByteChannelGenerator (ReadableByteChannel channel ,
534
- DataBufferFactory dataBufferFactory , int bufferSize ) {
516
+ public ReadableByteChannelGenerator (
517
+ ReadableByteChannel channel , DataBufferFactory dataBufferFactory , int bufferSize ) {
535
518
536
519
this .channel = channel ;
537
520
this .dataBufferFactory = dataBufferFactory ;
@@ -563,7 +546,6 @@ public void accept(SynchronousSink<DataBuffer> sink) {
563
546
}
564
547
}
565
548
}
566
-
567
549
}
568
550
569
551
@@ -582,10 +564,9 @@ private static class AsynchronousFileChannelReadCompletionHandler
582
564
583
565
private final AtomicBoolean disposed = new AtomicBoolean ();
584
566
567
+ public AsynchronousFileChannelReadCompletionHandler (AsynchronousFileChannel channel ,
568
+ FluxSink <DataBuffer > sink , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
585
569
586
- private AsynchronousFileChannelReadCompletionHandler (
587
- AsynchronousFileChannel channel , FluxSink <DataBuffer > sink ,
588
- long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
589
570
this .channel = channel ;
590
571
this .sink = sink ;
591
572
this .position = new AtomicLong (position );
@@ -599,10 +580,8 @@ public void completed(Integer read, DataBuffer dataBuffer) {
599
580
long pos = this .position .addAndGet (read );
600
581
dataBuffer .writePosition (read );
601
582
this .sink .next (dataBuffer );
602
-
603
583
if (!this .disposed .get ()) {
604
- DataBuffer newDataBuffer =
605
- this .dataBufferFactory .allocateBuffer (this .bufferSize );
584
+ DataBuffer newDataBuffer = this .dataBufferFactory .allocateBuffer (this .bufferSize );
606
585
ByteBuffer newByteBuffer = newDataBuffer .asByteBuffer (0 , this .bufferSize );
607
586
this .channel .read (newByteBuffer , pos , newDataBuffer , this );
608
587
}
@@ -618,12 +597,10 @@ public void failed(Throwable exc, DataBuffer dataBuffer) {
618
597
release (dataBuffer );
619
598
this .sink .error (exc );
620
599
}
621
-
622
600
}
623
601
624
602
625
- private static class AsynchronousFileChannelWriteCompletionHandler
626
- extends BaseSubscriber <DataBuffer >
603
+ private static class AsynchronousFileChannelWriteCompletionHandler extends BaseSubscriber <DataBuffer >
627
604
implements CompletionHandler <Integer , ByteBuffer > {
628
605
629
606
private final FluxSink <DataBuffer > sink ;
@@ -639,6 +616,7 @@ private static class AsynchronousFileChannelWriteCompletionHandler
639
616
640
617
public AsynchronousFileChannelWriteCompletionHandler (
641
618
FluxSink <DataBuffer > sink , AsynchronousFileChannel channel , long position ) {
619
+
642
620
this .sink = sink ;
643
621
this .channel = channel ;
644
622
this .position = new AtomicLong (position );
@@ -653,7 +631,6 @@ protected void hookOnSubscribe(Subscription subscription) {
653
631
protected void hookOnNext (DataBuffer value ) {
654
632
this .dataBuffer = value ;
655
633
ByteBuffer byteBuffer = value .asByteBuffer ();
656
-
657
634
this .channel .write (byteBuffer , this .position .get (), byteBuffer , this );
658
635
}
659
636
@@ -678,7 +655,6 @@ public void completed(Integer written, ByteBuffer byteBuffer) {
678
655
this .channel .write (byteBuffer , pos , byteBuffer , this );
679
656
return ;
680
657
}
681
-
682
658
if (this .dataBuffer != null ) {
683
659
this .sink .next (this .dataBuffer );
684
660
this .dataBuffer = null ;
@@ -696,4 +672,5 @@ public void failed(Throwable exc, ByteBuffer byteBuffer) {
696
672
this .sink .error (exc );
697
673
}
698
674
}
675
+
699
676
}
0 commit comments