diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java index 767eae8dde..0a81c36a7c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java @@ -111,6 +111,16 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel { * operation upon this channel, however, then an invocation of this method will block until the * first operation is complete. * + *
If your application needs to empty its ByteBuffer before progressing, use our helper + * method {@link StorageNonBlockingChannelUtils#blockingEmptyTo(ByteBuffer, + * WritableByteChannel)} like so: + * + *
{@code + * try (AppendableUploadWriteableByteChannel channel = session.open()) { + * int written = StorageNonBlockingChannelUtils.blockingEmptyTo(byteBuffer, channel); + * } + * }+ * * @param src The buffer from which bytes are to be retrieved * @return The number of bytes written, possibly zero * @throws ClosedChannelException If this channel is closed diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java index 32609ebfc4..c82f87e2c2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java @@ -159,33 +159,11 @@ static int alignSize(int size, int alignmentMultiple) { } static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException { - int total = 0; - while (buf.hasRemaining()) { - int read = c.read(buf); - if (read != -1) { - total += read; - } else if (total == 0) { - return -1; - } else { - break; - } - } - return total; + return StorageNonBlockingChannelUtils.blockingFillFrom(buf, c); } static int emptyTo(ByteBuffer buf, WritableByteChannel c) throws IOException { - int total = 0; - while (buf.hasRemaining()) { - int written = c.write(buf); - if (written != -1) { - total += written; - } else if (total == 0) { - return -1; - } else { - break; - } - } - return total; + return StorageNonBlockingChannelUtils.blockingEmptyTo(buf, c); } static long totalRemaining(ByteBuffer[] buffers, int offset, int length) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageNonBlockingChannelUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageNonBlockingChannelUtils.java new file mode 100644 index 0000000000..d913587552 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageNonBlockingChannelUtils.java @@ -0,0 +1,79 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +/** + * Set of utility methods for working with non-blocking channels returned by this library. + * + * @since 2.56.0 + */ +public final class StorageNonBlockingChannelUtils { + + private StorageNonBlockingChannelUtils() {} + + /** + * Attempt to fill {@code buf} from {@code c}, blocking the invoking thread if necessary in order + * to do so. + * + *
This method will not close {@code c}. + * + * @return The number of bytes read, possibly zero, or {@code -1} if the channel has reached + * end-of-stream + * @throws IOException any IOException from calling {@link ReadableByteChannel#read(ByteBuffer)} + * @since 2.56.0 + */ + public static int blockingFillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException { + int total = 0; + while (buf.hasRemaining()) { + int read = c.read(buf); + if (read != -1) { + total += read; + } else if (total == 0) { + return -1; + } else { + break; + } + } + return total; + } + + /** + * Attempt to empty {@code buf} to {@code c}, blocking the invoking thread if necessary in order + * to do so. + * + *
This method will not close {@code c} + * + * @return The number of bytes written, possibly zero + * @throws IOException any IOException from calling {@link WritableByteChannel#write(ByteBuffer)} + * @since 2.56.0 + */ + public static int blockingEmptyTo(ByteBuffer buf, WritableByteChannel c) throws IOException { + int total = 0; + while (buf.hasRemaining()) { + int written = c.write(buf); + if (written != 0) { + total += written; + } + } + return total; + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BuffersTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BuffersTest.java index 3721dbc48f..f0d760cb85 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BuffersTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BuffersTest.java @@ -16,15 +16,10 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.TestUtils.assertAll; -import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; -import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; import java.security.SecureRandom; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; public final class BuffersTest { @@ -77,87 +72,4 @@ public void allocateAligned_evenlyDivisible_capacityGtAlignment() { ByteBuffer b1 = Buffers.allocateAligned(8, 4); assertThat(b1.capacity()).isEqualTo(8); } - - @Test - public void fillFrom_handles_0SizeRead_someBytesRead() throws Exception { - byte[] bytes = new byte[14]; - ByteBuffer buf = ByteBuffer.wrap(bytes); - - byte[] expected = - new byte[] { - (byte) 'A', - (byte) 'B', - (byte) 'C', - (byte) 'A', - (byte) 'B', - (byte) 'A', - (byte) 'A', - (byte) 'A', - (byte) 'B', - (byte) 'A', - (byte) 'B', - (byte) 'C', - (byte) 0, - (byte) 0 - }; - - int[] acceptSequence = new int[] {3, 2, 1, 0, 0, 1, 2, 3}; - AtomicInteger readCount = new AtomicInteger(0); - - ReadableByteChannel c = - new ReadableByteChannel() { - @Override - public int read(ByteBuffer dst) throws IOException { - int i = readCount.getAndIncrement(); - if (i == acceptSequence.length) { - return -1; - } - int bytesToRead = acceptSequence[i]; - if (bytesToRead > 0) { - long copy = - Buffers.copy(DataGenerator.base64Characters().genByteBuffer(bytesToRead), dst); - assertThat(copy).isEqualTo(bytesToRead); - } - - return bytesToRead; - } - - @Override - public boolean isOpen() { - return true; - } - - @Override - public void close() throws IOException {} - }; - int filled = Buffers.fillFrom(buf, c); - - assertAll( - () -> assertThat(filled).isEqualTo(12), - () -> assertThat(xxd(bytes)).isEqualTo(xxd(expected))); - } - - @Test - public void fillFrom_handles_0SizeRead_noBytesRead() throws Exception { - ByteBuffer buf = ByteBuffer.allocate(3); - - ReadableByteChannel c = - new ReadableByteChannel() { - @Override - public int read(ByteBuffer dst) throws IOException { - return -1; - } - - @Override - public boolean isOpen() { - return true; - } - - @Override - public void close() throws IOException {} - }; - int filled = Buffers.fillFrom(buf, c); - - assertThat(filled).isEqualTo(-1); - } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageNonBlockingChannelUtilsTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageNonBlockingChannelUtilsTest.java new file mode 100644 index 0000000000..7fa86b61ec --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageNonBlockingChannelUtilsTest.java @@ -0,0 +1,270 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.StorageNonBlockingChannelUtils.blockingEmptyTo; +import static com.google.cloud.storage.StorageNonBlockingChannelUtils.blockingFillFrom; +import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.cloud.storage.TestUtils.xxd; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; + +public final class StorageNonBlockingChannelUtilsTest { + + @Test + public void emptyTo_fullyConsumed() throws Exception { + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(16); + AtomicInteger writeInvocationCount = new AtomicInteger(0); + int written = + blockingEmptyTo( + buf, + new SimpleWritableByteChannel() { + @Override + public int write(ByteBuffer src) { + int i = writeInvocationCount.getAndIncrement(); + if (i % 2 == 0) { + return 0; + } else { + src.get(); + return 1; + } + } + }); + assertAll( + () -> assertThat(written).isEqualTo(16), + () -> assertThat(writeInvocationCount.get()).isEqualTo(32), + () -> assertThat(buf.hasRemaining()).isFalse()); + } + + @Test + public void emptyTo_errorPropagated() throws Exception { + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(16); + AtomicInteger writeInvocationCount = new AtomicInteger(0); + IOException ioException = + assertThrows( + IOException.class, + () -> + blockingEmptyTo( + buf, + new SimpleWritableByteChannel() { + @Override + public int write(ByteBuffer src) throws IOException { + int i = writeInvocationCount.incrementAndGet(); + if (i == 0) { + return 0; + } else if (i == 3) { + throw new IOException("boom boom"); + } else { + src.get(); + return 1; + } + } + })); + assertAll( + () -> assertThat(ioException).hasMessageThat().isEqualTo("boom boom"), + () -> assertThat(writeInvocationCount.get()).isEqualTo(3), + () -> assertThat(buf.position()).isEqualTo(2)); + } + + @Test + public void fillFrom_fullyConsumed_dstGtEq_data() throws Exception { + ByteBuffer data = DataGenerator.base64Characters().genByteBuffer(16); + AtomicInteger readInvocationCount = new AtomicInteger(0); + ByteBuffer buf = ByteBuffer.allocate(32); + int read = + blockingFillFrom( + buf, + new SimpleReadableByteChannel() { + @Override + public int read(ByteBuffer dst) { + readInvocationCount.getAndIncrement(); + if (!data.hasRemaining()) { + return -1; + } else { + dst.put(data.get()); + return 1; + } + } + }); + assertAll( + () -> assertThat(read).isEqualTo(16), + () -> assertThat(readInvocationCount.get()).isEqualTo(16 + 1), // + 1 to read EOF + () -> assertThat(data.hasRemaining()).isFalse(), + () -> assertThat(buf.position()).isEqualTo(16)); + } + + @Test + public void fillFrom_fullyConsumed_dstLt_data() throws Exception { + ByteBuffer data = DataGenerator.base64Characters().genByteBuffer(16); + AtomicInteger readInvocationCount = new AtomicInteger(0); + ByteBuffer buf = ByteBuffer.allocate(8); + int read = + blockingFillFrom( + buf, + new SimpleReadableByteChannel() { + @Override + public int read(ByteBuffer dst) { + readInvocationCount.getAndIncrement(); + if (!data.hasRemaining()) { + return -1; + } else { + dst.put(data.get()); + return 1; + } + } + }); + assertAll( + () -> assertThat(read).isEqualTo(8), + () -> assertThat(readInvocationCount.get()).isEqualTo(8), + () -> assertThat(data.hasRemaining()).isTrue(), + () -> assertThat(buf.position()).isEqualTo(8)); + } + + @Test + public void fillFrom_eofPropagated() throws Exception { + AtomicInteger readInvocationCount = new AtomicInteger(0); + ByteBuffer buf = ByteBuffer.allocate(8); + int read = + blockingFillFrom( + buf, + new SimpleReadableByteChannel() { + @Override + public int read(ByteBuffer dst) { + readInvocationCount.getAndIncrement(); + return -1; + } + }); + assertAll( + () -> assertThat(read).isEqualTo(-1), + () -> assertThat(readInvocationCount.get()).isEqualTo(1), + () -> assertThat(buf.position()).isEqualTo(0)); + } + + @Test + public void fillFrom_errorPropagated() throws Exception { + ByteBuffer data = DataGenerator.base64Characters().genByteBuffer(16); + AtomicInteger readInvocationCount = new AtomicInteger(0); + ByteBuffer buf = ByteBuffer.allocate(32); + IOException ioException = + assertThrows( + IOException.class, + () -> + blockingFillFrom( + buf, + new SimpleReadableByteChannel() { + @Override + public int read(ByteBuffer dst) throws IOException { + int i = readInvocationCount.incrementAndGet(); + if (i == 0) { + return 0; + } else if (i == 3) { + throw new IOException("boom boom"); + } else { + dst.put(data.get()); + return 1; + } + } + })); + assertAll( + () -> assertThat(ioException).hasMessageThat().isEqualTo("boom boom"), + () -> assertThat(readInvocationCount.get()).isEqualTo(3), + () -> assertThat(buf.position()).isEqualTo(2), + () -> assertThat(buf.position()).isEqualTo(2)); + } + + @Test + public void fillFrom_handles_0SizeRead_someBytesRead() throws Exception { + byte[] bytes = new byte[14]; + ByteBuffer buf = ByteBuffer.wrap(bytes); + + byte[] expected = + new byte[] { + (byte) 'A', + (byte) 'B', + (byte) 'C', + (byte) 'A', + (byte) 'B', + (byte) 'A', + (byte) 'A', + (byte) 'A', + (byte) 'B', + (byte) 'A', + (byte) 'B', + (byte) 'C', + (byte) 0, + (byte) 0 + }; + + int[] acceptSequence = new int[] {3, 2, 1, 0, 0, 1, 2, 3}; + AtomicInteger readCount = new AtomicInteger(0); + + int filled = + blockingFillFrom( + buf, + new SimpleReadableByteChannel() { + @Override + public int read(ByteBuffer dst) { + int i = readCount.getAndIncrement(); + if (i == acceptSequence.length) { + return -1; + } + int bytesToRead = acceptSequence[i]; + if (bytesToRead > 0) { + long copy = + Buffers.copy( + DataGenerator.base64Characters().genByteBuffer(bytesToRead), dst); + assertThat(copy).isEqualTo(bytesToRead); + } + + return bytesToRead; + } + }); + + assertAll( + () -> assertThat(filled).isEqualTo(12), + () -> assertThat(xxd(bytes)).isEqualTo(xxd(expected))); + } + + private abstract static class SimpleWritableByteChannel implements WritableByteChannel { + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() {} + } + + private abstract static class SimpleReadableByteChannel implements ReadableByteChannel { + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() {} + } +}