Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOEx
@Override
public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
long totalRemaining = Buffers.totalRemaining(srcs, offset, length);
// internalWrite is non-blocking, but close is blocking.
// loop here to ensure all the bytes we need flush are enqueued before we transition to trying
// to close.
long written = 0;
do {
written += internalWrite(srcs, offset, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;

/**
* A class that helps tie together a {@link BidiUploadState}, {@link RetryContext} and underlying
* gRPC bidi stream.
*
* <p>This class helps transparently handle retries in the event an error is observed, and will
* handle redirect(s) if they occur, all without the need for the caller of this class to know about
* those things and the state need to worry about how retries will happen.
*/
final class BidiUploadStreamingStream {

private final BidiUploadState state;
Expand Down Expand Up @@ -382,7 +390,7 @@ static final class StreamingResponseObserver
public void onStart(StreamController controller) {
this.controller = controller;
controller.disableAutoInboundFlowControl();
controller.request(2);
controller.request(1);
}

@RequiresNonNull("controller")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,12 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel {
* .}{@link FlushPolicy#getMaxPendingBytes() getMaxPendingBytes()}. If the outbound queue is
* full, and can not fit more bytes, this method will return 0.
*
* <p>This method may be invoked at any time. If another thread has already initiated a write
* operation upon this channel, however, then an invocation of this method will block until the
* first operation is complete.
*
* <p>If your application needs to empty its ByteBuffer before progressing, use our helper
* method {@link StorageNonBlockingChannelUtils#blockingEmptyTo(ByteBuffer,
* WritableByteChannel)} like so:
* method {@link StorageChannelUtils#blockingEmptyTo(ByteBuffer, WritableByteChannel)} like so:
*
* <pre>{@code
* try (AppendableUploadWriteableByteChannel channel = session.open()) {
* int written = StorageNonBlockingChannelUtils.blockingEmptyTo(byteBuffer, channel);
* int written = StorageChannelUtils.blockingEmptyTo(byteBuffer, channel);
* }
* }</pre>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.storage.v2.BidiWriteObjectResponse;
import com.google.storage.v2.ServiceConstants.Values;
import java.util.Objects;
import java.util.function.BiFunction;
import javax.annotation.concurrent.Immutable;

Expand All @@ -52,14 +54,17 @@ public final class BlobAppendableUploadConfig {

private static final BlobAppendableUploadConfig INSTANCE =
new BlobAppendableUploadConfig(
FlushPolicy.minFlushSize(_256KiB), CloseAction.CLOSE_WITHOUT_FINALIZING);
FlushPolicy.minFlushSize(_256KiB), CloseAction.CLOSE_WITHOUT_FINALIZING, 3);

private final FlushPolicy flushPolicy;
private final CloseAction closeAction;
private final int maxRedirectsAllowed;

private BlobAppendableUploadConfig(FlushPolicy flushPolicy, CloseAction closeAction) {
private BlobAppendableUploadConfig(
FlushPolicy flushPolicy, CloseAction closeAction, int maxRedirectsAllowed) {
this.flushPolicy = flushPolicy;
this.closeAction = closeAction;
this.maxRedirectsAllowed = maxRedirectsAllowed;
}

/**
Expand Down Expand Up @@ -90,7 +95,7 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
if (this.flushPolicy.equals(flushPolicy)) {
return this;
}
return new BlobAppendableUploadConfig(flushPolicy, closeAction);
return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed);
}

/**
Expand All @@ -108,8 +113,9 @@ public CloseAction getCloseAction() {
}

/**
* Return an instance with the {@code CloseAction} set to be the specified value. <i>Default:</i>
* {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
* Return an instance with the {@code CloseAction} set to be the specified value.
*
* <p><i>Default:</i> {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
*
* @see #getCloseAction()
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
Expand All @@ -120,14 +126,65 @@ public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) {
if (this.closeAction == closeAction) {
return this;
}
return new BlobAppendableUploadConfig(flushPolicy, closeAction);
return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed);
}

/**
* The {@code maxRedirectsAllowed} set to be the specified value.
*
* <p><i>Default:</i> 3
*
* @see #withMaxRedirectsAllowed(int)
* @since 2.56.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
int getMaxRedirectsAllowed() {
return maxRedirectsAllowed;
}

/**
* Return an instance with the {@code maxRedirectsAllowed} set to be the specified value.
*
* <p><i>Default:</i> 3
*
* @see #getMaxRedirectsAllowed()
* @since 2.56.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
BlobAppendableUploadConfig withMaxRedirectsAllowed(int maxRedirectsAllowed) {
Preconditions.checkArgument(
maxRedirectsAllowed >= 0, "maxRedirectsAllowed >= 0 (%s >= 0)", maxRedirectsAllowed);
if (this.maxRedirectsAllowed == maxRedirectsAllowed) {
return this;
}
return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof BlobAppendableUploadConfig)) {
return false;
}
BlobAppendableUploadConfig that = (BlobAppendableUploadConfig) o;
return maxRedirectsAllowed == that.maxRedirectsAllowed
&& Objects.equals(flushPolicy, that.flushPolicy)
&& closeAction == that.closeAction;
}

@Override
public int hashCode() {
return Objects.hash(flushPolicy, closeAction, maxRedirectsAllowed);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("closeAction", closeAction)
.add("flushPolicy", flushPolicy)
.add("closeAction", closeAction)
.add("maxRedirectsAllowed", maxRedirectsAllowed)
.toString();
}

Expand Down Expand Up @@ -183,9 +240,6 @@ public enum CloseAction {
}

BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts<ObjectTargetOpt> opts) {
// TODO: make configurable
int maxRedirectsAllowed = 3;

long maxPendingBytes = this.getFlushPolicy().getMaxPendingBytes();
AppendableUploadState state = storage.getAppendableState(info, opts, maxPendingBytes);
WritableByteChannelSession<AppendableObjectBufferedWritableByteChannel, BidiWriteObjectResponse>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ public void flush() throws IOException {

@Override
public int write(ByteBuffer src) throws IOException {
lock.lock();
boolean locked = lock.tryLock();
if (!locked) {
return 0;
}
try {
return buffered.write(src);
} finally {
Expand All @@ -99,7 +102,6 @@ public int write(ByteBuffer src) throws IOException {

@Override
public boolean isOpen() {
lock.lock();
try {
return buffered.isOpen();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ static int alignSize(int size, int alignmentMultiple) {
}

static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException {
return StorageNonBlockingChannelUtils.blockingFillFrom(buf, c);
return StorageChannelUtils.blockingFillFrom(buf, c);
}

static int emptyTo(ByteBuffer buf, WritableByteChannel c) throws IOException {
return StorageNonBlockingChannelUtils.blockingEmptyTo(buf, c);
return StorageChannelUtils.blockingEmptyTo(buf, c);
}

static long totalRemaining(ByteBuffer[] buffers, int offset, int length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private BidiReadObjectResponseObserver() {}
public void onStart(StreamController controller) {
ObjectReadSessionStream.this.controller = controller;
controller.disableAutoInboundFlowControl();
controller.request(2);
controller.request(1);
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
*
* @since 2.56.0
*/
public final class StorageNonBlockingChannelUtils {
public final class StorageChannelUtils {

private StorageNonBlockingChannelUtils() {}
private StorageChannelUtils() {}

/**
* Attempt to fill {@code buf} from {@code c}, blocking the invoking thread if necessary in order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.google.cloud.storage;

import static com.google.cloud.storage.StorageNonBlockingChannelUtils.blockingEmptyTo;
import static com.google.cloud.storage.StorageNonBlockingChannelUtils.blockingFillFrom;
import static com.google.cloud.storage.StorageChannelUtils.blockingEmptyTo;
import static com.google.cloud.storage.StorageChannelUtils.blockingFillFrom;
import static com.google.cloud.storage.TestUtils.assertAll;
import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;
Expand All @@ -30,7 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

public final class StorageNonBlockingChannelUtilsTest {
public final class StorageChannelUtilsTest {

@Test
public void emptyTo_fullyConsumed() throws Exception {
Expand Down
Loading