From 9ba1625d6c7fdcbc1efedb7e551dc95c44967da2 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 12 Aug 2025 18:11:26 -0400 Subject: [PATCH 1/5] chore: make BlobAppendableUpload.AppendableUploadWriteableByteChannel#write non-blocking even when accessed from multiple threads Rather than blocking to acquire the lock, we try to acquire the lock, if acquired proceed otherwise return 0 immediately. --- .../java/com/google/cloud/storage/BlobAppendableUpload.java | 4 ---- .../com/google/cloud/storage/BlobAppendableUploadImpl.java | 6 ++++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java index 0a81c36a7c..ce7ae228a6 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java @@ -107,10 +107,6 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel { * .}{@link FlushPolicy#getMaxPendingBytes() getMaxPendingBytes()}. If the outbound queue is * full, and can not fit more bytes, this method will return 0. * - *

This method may be invoked at any time. If another thread has already initiated a write - * operation upon this channel, however, then an invocation of this method will block until the - * first operation is complete. - * *

If your application needs to empty its ByteBuffer before progressing, use our helper * method {@link StorageNonBlockingChannelUtils#blockingEmptyTo(ByteBuffer, * WritableByteChannel)} like so: diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java index 2b10ea3cf9..909d11dfa2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java @@ -89,7 +89,10 @@ public void flush() throws IOException { @Override public int write(ByteBuffer src) throws IOException { - lock.lock(); + boolean locked = lock.tryLock(); + if (!locked) { + return 0; + } try { return buffered.write(src); } finally { @@ -99,7 +102,6 @@ public int write(ByteBuffer src) throws IOException { @Override public boolean isOpen() { - lock.lock(); try { return buffered.isOpen(); } finally { From 2db2592e9ee73cb01babbfc4b2f4afa770c8fb8a Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 18 Aug 2025 15:27:14 -0400 Subject: [PATCH 2/5] chore: update bidi grpc calls to request 1 message initially instead of 2 Some cursory tracing and log inspection showed that initially requesting 2 instead of 1 didn't improve performance at all, but did make downstream tracers harder to reason about. --- .../com/google/cloud/storage/BidiUploadStreamingStream.java | 2 +- .../java/com/google/cloud/storage/ObjectReadSessionStream.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java index 6e210bc86c..93eba67734 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java @@ -382,7 +382,7 @@ static final class StreamingResponseObserver public void onStart(StreamController controller) { this.controller = controller; controller.disableAutoInboundFlowControl(); - controller.request(2); + controller.request(1); } @RequiresNonNull("controller") diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java index 37e78198b2..6f02b16866 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java @@ -257,7 +257,7 @@ private BidiReadObjectResponseObserver() {} public void onStart(StreamController controller) { ObjectReadSessionStream.this.controller = controller; controller.disableAutoInboundFlowControl(); - controller.request(2); + controller.request(1); } @SuppressWarnings("rawtypes") From 934e1a940eff3a82275485c7345cf47b0973f2ca Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 18 Aug 2025 15:29:37 -0400 Subject: [PATCH 3/5] chore: add comment to BidiAppendableUnbufferedWritableByteChannel#writehAndClose to explain why we are blocking --- .../BidiAppendableUnbufferedWritableByteChannel.java | 3 +++ .../java/com/google/cloud/storage/BlobAppendableUpload.java | 5 ++--- .../src/main/java/com/google/cloud/storage/Buffers.java | 4 ++-- ...onBlockingChannelUtils.java => StorageChannelUtils.java} | 4 ++-- ...ngChannelUtilsTest.java => StorageChannelUtilsTest.java} | 6 +++--- 5 files changed, 12 insertions(+), 10 deletions(-) rename google-cloud-storage/src/main/java/com/google/cloud/storage/{StorageNonBlockingChannelUtils.java => StorageChannelUtils.java} (95%) rename google-cloud-storage/src/test/java/com/google/cloud/storage/{StorageNonBlockingChannelUtilsTest.java => StorageChannelUtilsTest.java} (97%) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java index d24bd04dad..af3e37ba78 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java @@ -58,6 +58,9 @@ public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOEx @Override public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException { long totalRemaining = Buffers.totalRemaining(srcs, offset, length); + // internalWrite is non-blocking, but close is blocking. + // loop here to ensure all the bytes we need flush are enqueued before we transition to trying + // to close. long written = 0; do { written += internalWrite(srcs, offset, length); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java index ce7ae228a6..e6f9167ac7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java @@ -108,12 +108,11 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel { * full, and can not fit more bytes, this method will return 0. * *

If your application needs to empty its ByteBuffer before progressing, use our helper - * method {@link StorageNonBlockingChannelUtils#blockingEmptyTo(ByteBuffer, - * WritableByteChannel)} like so: + * method {@link StorageChannelUtils#blockingEmptyTo(ByteBuffer, WritableByteChannel)} like so: * *

{@code
      * try (AppendableUploadWriteableByteChannel channel = session.open()) {
-     *   int written = StorageNonBlockingChannelUtils.blockingEmptyTo(byteBuffer, channel);
+     *   int written = StorageChannelUtils.blockingEmptyTo(byteBuffer, channel);
      * }
      * }
* diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java index c82f87e2c2..21d8c2ed98 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java @@ -159,11 +159,11 @@ static int alignSize(int size, int alignmentMultiple) { } static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException { - return StorageNonBlockingChannelUtils.blockingFillFrom(buf, c); + return StorageChannelUtils.blockingFillFrom(buf, c); } static int emptyTo(ByteBuffer buf, WritableByteChannel c) throws IOException { - return StorageNonBlockingChannelUtils.blockingEmptyTo(buf, c); + return StorageChannelUtils.blockingEmptyTo(buf, c); } static long totalRemaining(ByteBuffer[] buffers, int offset, int length) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageNonBlockingChannelUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageChannelUtils.java similarity index 95% rename from google-cloud-storage/src/main/java/com/google/cloud/storage/StorageNonBlockingChannelUtils.java rename to google-cloud-storage/src/main/java/com/google/cloud/storage/StorageChannelUtils.java index d913587552..d720591045 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageNonBlockingChannelUtils.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageChannelUtils.java @@ -26,9 +26,9 @@ * * @since 2.56.0 */ -public final class StorageNonBlockingChannelUtils { +public final class StorageChannelUtils { - private StorageNonBlockingChannelUtils() {} + private StorageChannelUtils() {} /** * Attempt to fill {@code buf} from {@code c}, blocking the invoking thread if necessary in order diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageNonBlockingChannelUtilsTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageChannelUtilsTest.java similarity index 97% rename from google-cloud-storage/src/test/java/com/google/cloud/storage/StorageNonBlockingChannelUtilsTest.java rename to google-cloud-storage/src/test/java/com/google/cloud/storage/StorageChannelUtilsTest.java index 7fa86b61ec..5c6d87c164 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageNonBlockingChannelUtilsTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageChannelUtilsTest.java @@ -16,8 +16,8 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.StorageNonBlockingChannelUtils.blockingEmptyTo; -import static com.google.cloud.storage.StorageNonBlockingChannelUtils.blockingFillFrom; +import static com.google.cloud.storage.StorageChannelUtils.blockingEmptyTo; +import static com.google.cloud.storage.StorageChannelUtils.blockingFillFrom; import static com.google.cloud.storage.TestUtils.assertAll; import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -public final class StorageNonBlockingChannelUtilsTest { +public final class StorageChannelUtilsTest { @Test public void emptyTo_fullyConsumed() throws Exception { From 7c919dcbb25a0448775a7906264acc5638df5b5e Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 18 Aug 2025 17:15:10 -0400 Subject: [PATCH 4/5] chore: add comment to BidiUploadStreamingStream class --- .../google/cloud/storage/BidiUploadStreamingStream.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java index 93eba67734..885591e6b5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java @@ -47,6 +47,14 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.RequiresNonNull; +/** + * A class that helps tie together a {@link BidiUploadState}, {@link RetryContext} and underlying + * gRPC bidi stream. + * + *

This class helps transparently handle retries in the event an error is observed, and will + * handle redirect(s) if they occur, all without the need for the caller of this class to know about + * those things and the state need to worry about how retries will happen. + */ final class BidiUploadStreamingStream { private final BidiUploadState state; From 70b6b789e5adda37dce5bda7d67ef4e13a79f4de Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 18 Aug 2025 17:15:43 -0400 Subject: [PATCH 5/5] chore: make maxRedirectsAllowed a configurable parameter of BlobAppendableUploadConfig --- .../storage/BlobAppendableUploadConfig.java | 74 ++++++++++++++++--- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java index 8c9c99636c..a1b1ac7a85 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java @@ -32,8 +32,10 @@ import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.storage.v2.BidiWriteObjectResponse; import com.google.storage.v2.ServiceConstants.Values; +import java.util.Objects; import java.util.function.BiFunction; import javax.annotation.concurrent.Immutable; @@ -52,14 +54,17 @@ public final class BlobAppendableUploadConfig { private static final BlobAppendableUploadConfig INSTANCE = new BlobAppendableUploadConfig( - FlushPolicy.minFlushSize(_256KiB), CloseAction.CLOSE_WITHOUT_FINALIZING); + FlushPolicy.minFlushSize(_256KiB), CloseAction.CLOSE_WITHOUT_FINALIZING, 3); private final FlushPolicy flushPolicy; private final CloseAction closeAction; + private final int maxRedirectsAllowed; - private BlobAppendableUploadConfig(FlushPolicy flushPolicy, CloseAction closeAction) { + private BlobAppendableUploadConfig( + FlushPolicy flushPolicy, CloseAction closeAction, int maxRedirectsAllowed) { this.flushPolicy = flushPolicy; this.closeAction = closeAction; + this.maxRedirectsAllowed = maxRedirectsAllowed; } /** @@ -90,7 +95,7 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) { if (this.flushPolicy.equals(flushPolicy)) { return this; } - return new BlobAppendableUploadConfig(flushPolicy, closeAction); + return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed); } /** @@ -108,8 +113,9 @@ public CloseAction getCloseAction() { } /** - * Return an instance with the {@code CloseAction} set to be the specified value. Default: - * {@link CloseAction#CLOSE_WITHOUT_FINALIZING} + * Return an instance with the {@code CloseAction} set to be the specified value. + * + *

Default: {@link CloseAction#CLOSE_WITHOUT_FINALIZING} * * @see #getCloseAction() * @since 2.51.0 This new api is in preview and is subject to breaking changes. @@ -120,14 +126,65 @@ public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) { if (this.closeAction == closeAction) { return this; } - return new BlobAppendableUploadConfig(flushPolicy, closeAction); + return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed); + } + + /** + * The {@code maxRedirectsAllowed} set to be the specified value. + * + *

Default: 3 + * + * @see #withMaxRedirectsAllowed(int) + * @since 2.56.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + int getMaxRedirectsAllowed() { + return maxRedirectsAllowed; + } + + /** + * Return an instance with the {@code maxRedirectsAllowed} set to be the specified value. + * + *

Default: 3 + * + * @see #getMaxRedirectsAllowed() + * @since 2.56.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + BlobAppendableUploadConfig withMaxRedirectsAllowed(int maxRedirectsAllowed) { + Preconditions.checkArgument( + maxRedirectsAllowed >= 0, "maxRedirectsAllowed >= 0 (%s >= 0)", maxRedirectsAllowed); + if (this.maxRedirectsAllowed == maxRedirectsAllowed) { + return this; + } + return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof BlobAppendableUploadConfig)) { + return false; + } + BlobAppendableUploadConfig that = (BlobAppendableUploadConfig) o; + return maxRedirectsAllowed == that.maxRedirectsAllowed + && Objects.equals(flushPolicy, that.flushPolicy) + && closeAction == that.closeAction; + } + + @Override + public int hashCode() { + return Objects.hash(flushPolicy, closeAction, maxRedirectsAllowed); } @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("closeAction", closeAction) .add("flushPolicy", flushPolicy) + .add("closeAction", closeAction) + .add("maxRedirectsAllowed", maxRedirectsAllowed) .toString(); } @@ -183,9 +240,6 @@ public enum CloseAction { } BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts opts) { - // TODO: make configurable - int maxRedirectsAllowed = 3; - long maxPendingBytes = this.getFlushPolicy().getMaxPendingBytes(); AppendableUploadState state = storage.getAppendableState(info, opts, maxPendingBytes); WritableByteChannelSession