diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java index d24bd04dad..af3e37ba78 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java @@ -58,6 +58,9 @@ public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOEx @Override public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException { long totalRemaining = Buffers.totalRemaining(srcs, offset, length); + // internalWrite is non-blocking, but close is blocking. + // loop here to ensure all the bytes we need flush are enqueued before we transition to trying + // to close. long written = 0; do { written += internalWrite(srcs, offset, length); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java index 6e210bc86c..885591e6b5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java @@ -47,6 +47,14 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.RequiresNonNull; +/** + * A class that helps tie together a {@link BidiUploadState}, {@link RetryContext} and underlying + * gRPC bidi stream. + * + *

This class helps transparently handle retries in the event an error is observed, and will + * handle redirect(s) if they occur, all without the need for the caller of this class to know about + * those things and the state need to worry about how retries will happen. + */ final class BidiUploadStreamingStream { private final BidiUploadState state; @@ -382,7 +390,7 @@ static final class StreamingResponseObserver public void onStart(StreamController controller) { this.controller = controller; controller.disableAutoInboundFlowControl(); - controller.request(2); + controller.request(1); } @RequiresNonNull("controller") diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java index 0a81c36a7c..e6f9167ac7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java @@ -107,17 +107,12 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel { * .}{@link FlushPolicy#getMaxPendingBytes() getMaxPendingBytes()}. If the outbound queue is * full, and can not fit more bytes, this method will return 0. * - *

This method may be invoked at any time. If another thread has already initiated a write - * operation upon this channel, however, then an invocation of this method will block until the - * first operation is complete. - * *

If your application needs to empty its ByteBuffer before progressing, use our helper - * method {@link StorageNonBlockingChannelUtils#blockingEmptyTo(ByteBuffer, - * WritableByteChannel)} like so: + * method {@link StorageChannelUtils#blockingEmptyTo(ByteBuffer, WritableByteChannel)} like so: * *

{@code
      * try (AppendableUploadWriteableByteChannel channel = session.open()) {
-     *   int written = StorageNonBlockingChannelUtils.blockingEmptyTo(byteBuffer, channel);
+     *   int written = StorageChannelUtils.blockingEmptyTo(byteBuffer, channel);
      * }
      * }
* diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java index 8c9c99636c..a1b1ac7a85 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java @@ -32,8 +32,10 @@ import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.storage.v2.BidiWriteObjectResponse; import com.google.storage.v2.ServiceConstants.Values; +import java.util.Objects; import java.util.function.BiFunction; import javax.annotation.concurrent.Immutable; @@ -52,14 +54,17 @@ public final class BlobAppendableUploadConfig { private static final BlobAppendableUploadConfig INSTANCE = new BlobAppendableUploadConfig( - FlushPolicy.minFlushSize(_256KiB), CloseAction.CLOSE_WITHOUT_FINALIZING); + FlushPolicy.minFlushSize(_256KiB), CloseAction.CLOSE_WITHOUT_FINALIZING, 3); private final FlushPolicy flushPolicy; private final CloseAction closeAction; + private final int maxRedirectsAllowed; - private BlobAppendableUploadConfig(FlushPolicy flushPolicy, CloseAction closeAction) { + private BlobAppendableUploadConfig( + FlushPolicy flushPolicy, CloseAction closeAction, int maxRedirectsAllowed) { this.flushPolicy = flushPolicy; this.closeAction = closeAction; + this.maxRedirectsAllowed = maxRedirectsAllowed; } /** @@ -90,7 +95,7 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) { if (this.flushPolicy.equals(flushPolicy)) { return this; } - return new BlobAppendableUploadConfig(flushPolicy, closeAction); + return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed); } /** @@ -108,8 +113,9 @@ public CloseAction getCloseAction() { } /** - * Return an instance with the {@code CloseAction} set to be the specified value. Default: - * {@link CloseAction#CLOSE_WITHOUT_FINALIZING} + * Return an instance with the {@code CloseAction} set to be the specified value. + * + *

Default: {@link CloseAction#CLOSE_WITHOUT_FINALIZING} * * @see #getCloseAction() * @since 2.51.0 This new api is in preview and is subject to breaking changes. @@ -120,14 +126,65 @@ public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) { if (this.closeAction == closeAction) { return this; } - return new BlobAppendableUploadConfig(flushPolicy, closeAction); + return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed); + } + + /** + * The {@code maxRedirectsAllowed} set to be the specified value. + * + *

Default: 3 + * + * @see #withMaxRedirectsAllowed(int) + * @since 2.56.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + int getMaxRedirectsAllowed() { + return maxRedirectsAllowed; + } + + /** + * Return an instance with the {@code maxRedirectsAllowed} set to be the specified value. + * + *

Default: 3 + * + * @see #getMaxRedirectsAllowed() + * @since 2.56.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + BlobAppendableUploadConfig withMaxRedirectsAllowed(int maxRedirectsAllowed) { + Preconditions.checkArgument( + maxRedirectsAllowed >= 0, "maxRedirectsAllowed >= 0 (%s >= 0)", maxRedirectsAllowed); + if (this.maxRedirectsAllowed == maxRedirectsAllowed) { + return this; + } + return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof BlobAppendableUploadConfig)) { + return false; + } + BlobAppendableUploadConfig that = (BlobAppendableUploadConfig) o; + return maxRedirectsAllowed == that.maxRedirectsAllowed + && Objects.equals(flushPolicy, that.flushPolicy) + && closeAction == that.closeAction; + } + + @Override + public int hashCode() { + return Objects.hash(flushPolicy, closeAction, maxRedirectsAllowed); } @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("closeAction", closeAction) .add("flushPolicy", flushPolicy) + .add("closeAction", closeAction) + .add("maxRedirectsAllowed", maxRedirectsAllowed) .toString(); } @@ -183,9 +240,6 @@ public enum CloseAction { } BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts opts) { - // TODO: make configurable - int maxRedirectsAllowed = 3; - long maxPendingBytes = this.getFlushPolicy().getMaxPendingBytes(); AppendableUploadState state = storage.getAppendableState(info, opts, maxPendingBytes); WritableByteChannelSession diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java index 2b10ea3cf9..909d11dfa2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java @@ -89,7 +89,10 @@ public void flush() throws IOException { @Override public int write(ByteBuffer src) throws IOException { - lock.lock(); + boolean locked = lock.tryLock(); + if (!locked) { + return 0; + } try { return buffered.write(src); } finally { @@ -99,7 +102,6 @@ public int write(ByteBuffer src) throws IOException { @Override public boolean isOpen() { - lock.lock(); try { return buffered.isOpen(); } finally { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java index c82f87e2c2..21d8c2ed98 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java @@ -159,11 +159,11 @@ static int alignSize(int size, int alignmentMultiple) { } static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException { - return StorageNonBlockingChannelUtils.blockingFillFrom(buf, c); + return StorageChannelUtils.blockingFillFrom(buf, c); } static int emptyTo(ByteBuffer buf, WritableByteChannel c) throws IOException { - return StorageNonBlockingChannelUtils.blockingEmptyTo(buf, c); + return StorageChannelUtils.blockingEmptyTo(buf, c); } static long totalRemaining(ByteBuffer[] buffers, int offset, int length) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java index 37e78198b2..6f02b16866 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java @@ -257,7 +257,7 @@ private BidiReadObjectResponseObserver() {} public void onStart(StreamController controller) { ObjectReadSessionStream.this.controller = controller; controller.disableAutoInboundFlowControl(); - controller.request(2); + controller.request(1); } @SuppressWarnings("rawtypes") diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageNonBlockingChannelUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageChannelUtils.java similarity index 95% rename from google-cloud-storage/src/main/java/com/google/cloud/storage/StorageNonBlockingChannelUtils.java rename to google-cloud-storage/src/main/java/com/google/cloud/storage/StorageChannelUtils.java index d913587552..d720591045 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageNonBlockingChannelUtils.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageChannelUtils.java @@ -26,9 +26,9 @@ * * @since 2.56.0 */ -public final class StorageNonBlockingChannelUtils { +public final class StorageChannelUtils { - private StorageNonBlockingChannelUtils() {} + private StorageChannelUtils() {} /** * Attempt to fill {@code buf} from {@code c}, blocking the invoking thread if necessary in order diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageNonBlockingChannelUtilsTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageChannelUtilsTest.java similarity index 97% rename from google-cloud-storage/src/test/java/com/google/cloud/storage/StorageNonBlockingChannelUtilsTest.java rename to google-cloud-storage/src/test/java/com/google/cloud/storage/StorageChannelUtilsTest.java index 7fa86b61ec..5c6d87c164 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageNonBlockingChannelUtilsTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageChannelUtilsTest.java @@ -16,8 +16,8 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.StorageNonBlockingChannelUtils.blockingEmptyTo; -import static com.google.cloud.storage.StorageNonBlockingChannelUtils.blockingFillFrom; +import static com.google.cloud.storage.StorageChannelUtils.blockingEmptyTo; +import static com.google.cloud.storage.StorageChannelUtils.blockingFillFrom; import static com.google.cloud.storage.TestUtils.assertAll; import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -public final class StorageNonBlockingChannelUtilsTest { +public final class StorageChannelUtilsTest { @Test public void emptyTo_fullyConsumed() throws Exception {