2222import java .nio .charset .Charset ;
2323import java .nio .charset .StandardCharsets ;
2424import java .nio .file .Path ;
25+ import java .util .Arrays ;
2526import java .util .Optional ;
2627import java .util .concurrent .CompletableFuture ;
2728import java .util .concurrent .ExecutorService ;
2829import org .reactivestreams .Publisher ;
2930import org .reactivestreams .Subscriber ;
3031import software .amazon .awssdk .annotations .SdkPublicApi ;
31- import software .amazon .awssdk .core .internal .async .ByteArrayAsyncRequestBody ;
32+ import software .amazon .awssdk .core .internal .async .ByteBuffersAsyncRequestBody ;
3233import software .amazon .awssdk .core .internal .async .FileAsyncRequestBody ;
3334import software .amazon .awssdk .core .internal .async .InputStreamWithExecutorAsyncRequestBody ;
3435import software .amazon .awssdk .core .internal .async .SplittingPublisher ;
3738import software .amazon .awssdk .utils .Validate ;
3839
3940/**
40- * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
41- * this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
42- * of the data (i.e. to write that data on the wire).
41+ * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
42+ * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
43+ * to write that data on the wire).
4344 *
4445 * <p>
4546 * {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
46- * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
47- * org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
48- * the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
47+ * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
48+ * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
49+ * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
4950 * </p>
5051 *
5152 * <p>
52- * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
53- * The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
54- * for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
53+ * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
54+ * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
55+ * notified via the {@link org.reactivestreams.Subscription#request(long)} method.
5556 * </p>
5657 *
5758 * @see FileAsyncRequestBody
58- * @see ByteArrayAsyncRequestBody
59+ * @see ByteBuffersAsyncRequestBody
5960 */
6061@ SdkPublicApi
6162public interface AsyncRequestBody extends SdkPublisher <ByteBuffer > {
@@ -73,8 +74,8 @@ default String contentType() {
7374 }
7475
7576 /**
76- * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
77- * The data is delivered when the publisher publishes the data.
77+ * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
78+ * publisher publishes the data.
7879 *
7980 * @param publisher Publisher of source data
8081 * @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
@@ -127,11 +128,11 @@ static AsyncRequestBody fromFile(File file) {
127128 * @param string The string to provide.
128129 * @param cs The {@link Charset} to use.
129130 * @return Implementation of {@link AsyncRequestBody} that uses the specified string.
130- * @see ByteArrayAsyncRequestBody
131+ * @see ByteBuffersAsyncRequestBody
131132 */
132133 static AsyncRequestBody fromString (String string , Charset cs ) {
133- return new ByteArrayAsyncRequestBody ( string . getBytes ( cs ),
134- Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ));
134+ return ByteBuffersAsyncRequestBody . from ( Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ),
135+ string . getBytes ( cs ));
135136 }
136137
137138 /**
@@ -146,29 +147,181 @@ static AsyncRequestBody fromString(String string) {
146147 }
147148
148149 /**
149- * Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
150- * original byte array are not reflected in the {@link AsyncRequestBody}.
150+ * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
151+ * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
151152 *
152153 * @param bytes The bytes to send to the service.
153154 * @return AsyncRequestBody instance.
154155 */
155156 static AsyncRequestBody fromBytes (byte [] bytes ) {
156- return new ByteArrayAsyncRequestBody (bytes , Mimetype .MIMETYPE_OCTET_STREAM );
157+ byte [] clonedBytes = bytes .clone ();
158+ return ByteBuffersAsyncRequestBody .from (clonedBytes );
157159 }
158160
159161 /**
160- * Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
161- * made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
162+ * Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
163+ * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
164+ * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
165+ * {@code AsyncRequestBody} implementation.
166+ *
167+ * <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
168+ *
169+ * @param bytes The bytes to send to the service.
170+ * @return AsyncRequestBody instance.
171+ */
172+ static AsyncRequestBody fromBytesUnsafe (byte [] bytes ) {
173+ return ByteBuffersAsyncRequestBody .from (bytes );
174+ }
175+
176+ /**
177+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
178+ * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
179+ * <p>
180+ * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
181+ * it to copy only the remaining readable bytes.
162182 *
163183 * @param byteBuffer ByteBuffer to send to the service.
164184 * @return AsyncRequestBody instance.
165185 */
166186 static AsyncRequestBody fromByteBuffer (ByteBuffer byteBuffer ) {
167- return fromBytes (BinaryUtils .copyAllBytesFrom (byteBuffer ));
187+ ByteBuffer immutableCopy = BinaryUtils .immutableCopyOf (byteBuffer );
188+ immutableCopy .rewind ();
189+ return ByteBuffersAsyncRequestBody .of ((long ) immutableCopy .remaining (), immutableCopy );
190+ }
191+
192+ /**
193+ * Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
194+ * remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
195+ * reflected in the {@link AsyncRequestBody}.
196+ * <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
197+ * only the remaining bytes.
198+ *
199+ * @param byteBuffer ByteBuffer to send to the service.
200+ * @return AsyncRequestBody instance.
201+ */
202+ static AsyncRequestBody fromRemainingByteBuffer (ByteBuffer byteBuffer ) {
203+ ByteBuffer immutableCopy = BinaryUtils .immutableCopyOfRemaining (byteBuffer );
204+ return ByteBuffersAsyncRequestBody .of ((long ) immutableCopy .remaining (), immutableCopy );
205+ }
206+
207+ /**
208+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
209+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
210+ * {@code AsyncRequestBody} implementation.
211+ * <p>
212+ * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
213+ * need it to copy only the remaining readable bytes.
214+ *
215+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
216+ * risks.
217+ *
218+ * @param byteBuffer ByteBuffer to send to the service.
219+ * @return AsyncRequestBody instance.
220+ */
221+ static AsyncRequestBody fromByteBufferUnsafe (ByteBuffer byteBuffer ) {
222+ ByteBuffer readOnlyBuffer = byteBuffer .asReadOnlyBuffer ();
223+ readOnlyBuffer .rewind ();
224+ return ByteBuffersAsyncRequestBody .of ((long ) readOnlyBuffer .remaining (), readOnlyBuffer );
225+ }
226+
227+ /**
228+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
229+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
230+ * {@code AsyncRequestBody} implementation.
231+ * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
232+ * the buffer and reads only the remaining bytes.
233+ *
234+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
235+ * risks.
236+ *
237+ * @param byteBuffer ByteBuffer to send to the service.
238+ * @return AsyncRequestBody instance.
239+ */
240+ static AsyncRequestBody fromRemainingByteBufferUnsafe (ByteBuffer byteBuffer ) {
241+ ByteBuffer readOnlyBuffer = byteBuffer .asReadOnlyBuffer ();
242+ return ByteBuffersAsyncRequestBody .of ((long ) readOnlyBuffer .remaining (), readOnlyBuffer );
243+ }
244+
245+ /**
246+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
247+ * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
248+ * <p>
249+ * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
250+ * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
251+ *
252+ * @param byteBuffers ByteBuffer array to send to the service.
253+ * @return AsyncRequestBody instance.
254+ */
255+ static AsyncRequestBody fromByteBuffers (ByteBuffer ... byteBuffers ) {
256+ ByteBuffer [] immutableCopy = Arrays .stream (byteBuffers )
257+ .map (BinaryUtils ::immutableCopyOf )
258+ .peek (ByteBuffer ::rewind )
259+ .toArray (ByteBuffer []::new );
260+ return ByteBuffersAsyncRequestBody .of (immutableCopy );
261+ }
262+
263+ /**
264+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
265+ * {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
266+ * {@link AsyncRequestBody}.
267+ * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
268+ * this method respects the current read position of each buffer and reads only the remaining bytes.
269+ *
270+ * @param byteBuffers ByteBuffer array to send to the service.
271+ * @return AsyncRequestBody instance.
272+ */
273+ static AsyncRequestBody fromRemainingByteBuffers (ByteBuffer ... byteBuffers ) {
274+ ByteBuffer [] immutableCopy = Arrays .stream (byteBuffers )
275+ .map (BinaryUtils ::immutableCopyOfRemaining )
276+ .peek (ByteBuffer ::rewind )
277+ .toArray (ByteBuffer []::new );
278+ return ByteBuffersAsyncRequestBody .of (immutableCopy );
279+ }
280+
281+ /**
282+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
283+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
284+ * {@code AsyncRequestBody} implementation.
285+ * <p>
286+ * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
287+ * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
288+ *
289+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
290+ * risks.
291+ *
292+ * @param byteBuffers ByteBuffer array to send to the service.
293+ * @return AsyncRequestBody instance.
294+ */
295+ static AsyncRequestBody fromByteBuffersUnsafe (ByteBuffer ... byteBuffers ) {
296+ ByteBuffer [] readOnlyBuffers = Arrays .stream (byteBuffers )
297+ .map (ByteBuffer ::asReadOnlyBuffer )
298+ .peek (ByteBuffer ::rewind )
299+ .toArray (ByteBuffer []::new );
300+ return ByteBuffersAsyncRequestBody .of (readOnlyBuffers );
301+ }
302+
303+ /**
304+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
305+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
306+ * {@code AsyncRequestBody} implementation.
307+ * <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
308+ * this method respects the current read position of each buffer and reads only the remaining bytes.
309+ *
310+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
311+ * risks.
312+ *
313+ * @param byteBuffers ByteBuffer array to send to the service.
314+ * @return AsyncRequestBody instance.
315+ */
316+ static AsyncRequestBody fromRemainingByteBuffersUnsafe (ByteBuffer ... byteBuffers ) {
317+ ByteBuffer [] readOnlyBuffers = Arrays .stream (byteBuffers )
318+ .map (ByteBuffer ::asReadOnlyBuffer )
319+ .toArray (ByteBuffer []::new );
320+ return ByteBuffersAsyncRequestBody .of (readOnlyBuffers );
168321 }
169322
170323 /**
171- * Creates a {@link AsyncRequestBody} from a {@link InputStream}.
324+ * Creates an {@link AsyncRequestBody} from an {@link InputStream}.
172325 *
173326 * <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
174327 * non-blocking event loop threads owned by the SDK.
@@ -242,7 +395,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
242395 }
243396
244397 /**
245- * Creates a {@link AsyncRequestBody} with no content.
398+ * Creates an {@link AsyncRequestBody} with no content.
246399 *
247400 * @return AsyncRequestBody instance.
248401 */
0 commit comments