2222import java .nio .charset .Charset ;
2323import java .nio .charset .StandardCharsets ;
2424import java .nio .file .Path ;
25+ import java .util .Arrays ;
2526import java .util .Optional ;
2627import java .util .concurrent .ExecutorService ;
2728import org .reactivestreams .Publisher ;
2829import org .reactivestreams .Subscriber ;
2930import software .amazon .awssdk .annotations .SdkPublicApi ;
30- import software .amazon .awssdk .core .internal .async .ByteArrayAsyncRequestBody ;
31+ import software .amazon .awssdk .core .internal .async .ByteBuffersAsyncRequestBody ;
3132import software .amazon .awssdk .core .internal .async .FileAsyncRequestBody ;
3233import software .amazon .awssdk .core .internal .async .InputStreamWithExecutorAsyncRequestBody ;
3334import software .amazon .awssdk .core .internal .util .Mimetype ;
3435import software .amazon .awssdk .utils .BinaryUtils ;
3536
3637/**
37- * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
38- * this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
39- * of the data (i.e. to write that data on the wire).
38+ * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
39+ * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
40+ * to write that data on the wire).
4041 *
4142 * <p>
4243 * {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
43- * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
44- * org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
45- * the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
44+ * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
45+ * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
46+ * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
4647 * </p>
4748 *
4849 * <p>
49- * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
50- * The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
51- * for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
50+ * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
51+ * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
52+ * notified via the {@link org.reactivestreams.Subscription#request(long)} method.
5253 * </p>
5354 *
5455 * @see FileAsyncRequestBody
55- * @see ByteArrayAsyncRequestBody
56+ * @see ByteBuffersAsyncRequestBody
5657 */
5758@ SdkPublicApi
5859public interface AsyncRequestBody extends SdkPublisher <ByteBuffer > {
@@ -70,8 +71,8 @@ default String contentType() {
7071 }
7172
7273 /**
73- * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
74- * The data is delivered when the publisher publishes the data.
74+ * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
75+ * publisher publishes the data.
7576 *
7677 * @param publisher Publisher of source data
7778 * @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
@@ -124,11 +125,11 @@ static AsyncRequestBody fromFile(File file) {
124125 * @param string The string to provide.
125126 * @param cs The {@link Charset} to use.
126127 * @return Implementation of {@link AsyncRequestBody} that uses the specified string.
127- * @see ByteArrayAsyncRequestBody
128+ * @see ByteBuffersAsyncRequestBody
128129 */
129130 static AsyncRequestBody fromString (String string , Charset cs ) {
130- return new ByteArrayAsyncRequestBody ( string . getBytes ( cs ),
131- Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ));
131+ return ByteBuffersAsyncRequestBody . from ( Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ),
132+ string . getBytes ( cs ));
132133 }
133134
134135 /**
@@ -143,29 +144,181 @@ static AsyncRequestBody fromString(String string) {
143144 }
144145
145146 /**
146- * Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
147- * original byte array are not reflected in the {@link AsyncRequestBody}.
147+ * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
148+ * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
148149 *
149150 * @param bytes The bytes to send to the service.
150151 * @return AsyncRequestBody instance.
151152 */
152153 static AsyncRequestBody fromBytes (byte [] bytes ) {
153- return new ByteArrayAsyncRequestBody (bytes , Mimetype .MIMETYPE_OCTET_STREAM );
154+ byte [] clonedBytes = bytes .clone ();
155+ return ByteBuffersAsyncRequestBody .from (clonedBytes );
154156 }
155157
156158 /**
157- * Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
158- * made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
159+ * Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
160+ * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
161+ * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
162+ * {@code AsyncRequestBody} implementation.
163+ *
164+ * <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
165+ *
166+ * @param bytes The bytes to send to the service.
167+ * @return AsyncRequestBody instance.
168+ */
169+ static AsyncRequestBody fromBytesUnsafe (byte [] bytes ) {
170+ return ByteBuffersAsyncRequestBody .from (bytes );
171+ }
172+
173+ /**
174+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
175+ * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
176+ * <p>
177+ * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
178+ * it to copy only the remaining readable bytes.
159179 *
160180 * @param byteBuffer ByteBuffer to send to the service.
161181 * @return AsyncRequestBody instance.
162182 */
163183 static AsyncRequestBody fromByteBuffer (ByteBuffer byteBuffer ) {
164- return fromBytes (BinaryUtils .copyAllBytesFrom (byteBuffer ));
184+ ByteBuffer immutableCopy = BinaryUtils .immutableCopyOf (byteBuffer );
185+ immutableCopy .rewind ();
186+ return ByteBuffersAsyncRequestBody .of ((long ) immutableCopy .remaining (), immutableCopy );
187+ }
188+
189+ /**
190+ * Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
191+ * remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
192+ * reflected in the {@link AsyncRequestBody}.
193+ * <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
194+ * only the remaining bytes.
195+ *
196+ * @param byteBuffer ByteBuffer to send to the service.
197+ * @return AsyncRequestBody instance.
198+ */
199+ static AsyncRequestBody fromRemainingByteBuffer (ByteBuffer byteBuffer ) {
200+ ByteBuffer immutableCopy = BinaryUtils .immutableCopyOfRemaining (byteBuffer );
201+ return ByteBuffersAsyncRequestBody .of ((long ) immutableCopy .remaining (), immutableCopy );
202+ }
203+
204+ /**
205+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
206+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
207+ * {@code AsyncRequestBody} implementation.
208+ * <p>
209+ * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
210+ * need it to copy only the remaining readable bytes.
211+ *
212+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
213+ * risks.
214+ *
215+ * @param byteBuffer ByteBuffer to send to the service.
216+ * @return AsyncRequestBody instance.
217+ */
218+ static AsyncRequestBody fromByteBufferUnsafe (ByteBuffer byteBuffer ) {
219+ ByteBuffer readOnlyBuffer = byteBuffer .asReadOnlyBuffer ();
220+ readOnlyBuffer .rewind ();
221+ return ByteBuffersAsyncRequestBody .of ((long ) readOnlyBuffer .remaining (), readOnlyBuffer );
222+ }
223+
224+ /**
225+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
226+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
227+ * {@code AsyncRequestBody} implementation.
228+ * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
229+ * the buffer and reads only the remaining bytes.
230+ *
231+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
232+ * risks.
233+ *
234+ * @param byteBuffer ByteBuffer to send to the service.
235+ * @return AsyncRequestBody instance.
236+ */
237+ static AsyncRequestBody fromRemainingByteBufferUnsafe (ByteBuffer byteBuffer ) {
238+ ByteBuffer readOnlyBuffer = byteBuffer .asReadOnlyBuffer ();
239+ return ByteBuffersAsyncRequestBody .of ((long ) readOnlyBuffer .remaining (), readOnlyBuffer );
240+ }
241+
242+ /**
243+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
244+ * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
245+ * <p>
246+ * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
247+ * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
248+ *
249+ * @param byteBuffers ByteBuffer array to send to the service.
250+ * @return AsyncRequestBody instance.
251+ */
252+ static AsyncRequestBody fromByteBuffers (ByteBuffer ... byteBuffers ) {
253+ ByteBuffer [] immutableCopy = Arrays .stream (byteBuffers )
254+ .map (BinaryUtils ::immutableCopyOf )
255+ .peek (ByteBuffer ::rewind )
256+ .toArray (ByteBuffer []::new );
257+ return ByteBuffersAsyncRequestBody .of (immutableCopy );
258+ }
259+
260+ /**
261+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
262+ * {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
263+ * {@link AsyncRequestBody}.
264+ * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
265+ * this method respects the current read position of each buffer and reads only the remaining bytes.
266+ *
267+ * @param byteBuffers ByteBuffer array to send to the service.
268+ * @return AsyncRequestBody instance.
269+ */
270+ static AsyncRequestBody fromRemainingByteBuffers (ByteBuffer ... byteBuffers ) {
271+ ByteBuffer [] immutableCopy = Arrays .stream (byteBuffers )
272+ .map (BinaryUtils ::immutableCopyOfRemaining )
273+ .peek (ByteBuffer ::rewind )
274+ .toArray (ByteBuffer []::new );
275+ return ByteBuffersAsyncRequestBody .of (immutableCopy );
276+ }
277+
278+ /**
279+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
280+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
281+ * {@code AsyncRequestBody} implementation.
282+ * <p>
283+ * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
284+ * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
285+ *
286+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
287+ * risks.
288+ *
289+ * @param byteBuffers ByteBuffer array to send to the service.
290+ * @return AsyncRequestBody instance.
291+ */
292+ static AsyncRequestBody fromByteBuffersUnsafe (ByteBuffer ... byteBuffers ) {
293+ ByteBuffer [] readOnlyBuffers = Arrays .stream (byteBuffers )
294+ .map (ByteBuffer ::asReadOnlyBuffer )
295+ .peek (ByteBuffer ::rewind )
296+ .toArray (ByteBuffer []::new );
297+ return ByteBuffersAsyncRequestBody .of (readOnlyBuffers );
298+ }
299+
300+ /**
301+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
302+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
303+ * {@code AsyncRequestBody} implementation.
304+ * <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
305+ * this method respects the current read position of each buffer and reads only the remaining bytes.
306+ *
307+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
308+ * risks.
309+ *
310+ * @param byteBuffers ByteBuffer array to send to the service.
311+ * @return AsyncRequestBody instance.
312+ */
313+ static AsyncRequestBody fromRemainingByteBuffersUnsafe (ByteBuffer ... byteBuffers ) {
314+ ByteBuffer [] readOnlyBuffers = Arrays .stream (byteBuffers )
315+ .map (ByteBuffer ::asReadOnlyBuffer )
316+ .toArray (ByteBuffer []::new );
317+ return ByteBuffersAsyncRequestBody .of (readOnlyBuffers );
165318 }
166319
167320 /**
168- * Creates a {@link AsyncRequestBody} from a {@link InputStream}.
321+ * Creates an {@link AsyncRequestBody} from an {@link InputStream}.
169322 *
170323 * <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
171324 * non-blocking event loop threads owned by the SDK.
@@ -239,7 +392,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
239392 }
240393
241394 /**
242- * Creates a {@link AsyncRequestBody} with no content.
395+ * Creates an {@link AsyncRequestBody} with no content.
243396 *
244397 * @return AsyncRequestBody instance.
245398 */
0 commit comments