Skip to content

Commit c0febee

Browse files
committed
chore: new bidi appendable channel bootstrappable
* Add FlushPolicy.MinFlushSizeFlushPolicy.maxPendingBytes * Add new contructor to ChannelSession to allow the resultFuture to be passed in. * Create temporary BlobAppendableUploadImpl.AppendableUnbufferedWritableByteChannel to allow both old and new implementations to be passed to BlobAppendableUploadImpl -- this interface will be removed in cleanup * Add new implementation using streaming buffer management as a sibling to the existing implementation. The existing implementation will be removed entirely in cleanup. * Refactored tests to follow
1 parent 555ee59 commit c0febee

9 files changed

+264
-71
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.cloud.BaseServiceException;
20+
import com.google.cloud.storage.BlobAppendableUploadImpl.AppendableUnbufferedWritableByteChannel;
2021
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
21-
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
2222
import java.io.IOException;
2323
import java.io.InterruptedIOException;
2424
import java.nio.ByteBuffer;
@@ -27,7 +27,8 @@
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.concurrent.TimeoutException;
2929

30-
final class BidiAppendableUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {
30+
final class BidiAppendableUnbufferedWritableByteChannel
31+
implements AppendableUnbufferedWritableByteChannel {
3132

3233
private final BidiUploadStreamingStream stream;
3334
private final ChunkSegmenter chunkSegmenter;

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java

Lines changed: 139 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,28 @@
1919
import static com.google.cloud.storage.ByteSizeConstants._256KiB;
2020
import static java.util.Objects.requireNonNull;
2121

22+
import com.google.api.core.ApiFuture;
2223
import com.google.api.core.ApiFutures;
2324
import com.google.api.core.BetaApi;
2425
import com.google.api.core.InternalApi;
26+
import com.google.api.core.SettableApiFuture;
2527
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
2628
import com.google.api.gax.rpc.AbortedException;
2729
import com.google.api.gax.rpc.ApiException;
30+
import com.google.cloud.storage.BidiUploadState.AppendableUploadState;
31+
import com.google.cloud.storage.BidiUploadState.TakeoverAppendableUploadState;
2832
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
2933
import com.google.cloud.storage.BlobAppendableUploadImpl.AppendableObjectBufferedWritableByteChannel;
3034
import com.google.cloud.storage.Storage.BlobWriteOption;
3135
import com.google.cloud.storage.TransportCompatibility.Transport;
3236
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
3337
import com.google.cloud.storage.UnifiedOpts.Opts;
38+
import com.google.common.base.MoreObjects;
3439
import com.google.storage.v2.BidiWriteObjectRequest;
3540
import com.google.storage.v2.BidiWriteObjectResponse;
3641
import com.google.storage.v2.Object;
42+
import com.google.storage.v2.ServiceConstants.Values;
43+
import java.util.function.BiFunction;
3744
import javax.annotation.concurrent.Immutable;
3845

3946
/**
@@ -53,17 +60,20 @@ public final class BlobAppendableUploadConfig {
5360
new BlobAppendableUploadConfig(
5461
FlushPolicy.minFlushSize(_256KiB),
5562
Hasher.enabled(),
56-
CloseAction.CLOSE_WITHOUT_FINALIZING);
63+
CloseAction.CLOSE_WITHOUT_FINALIZING,
64+
false);
5765

5866
private final FlushPolicy flushPolicy;
5967
private final Hasher hasher;
6068
private final CloseAction closeAction;
69+
private final boolean newImpl;
6170

6271
private BlobAppendableUploadConfig(
63-
FlushPolicy flushPolicy, Hasher hasher, CloseAction closeAction) {
72+
FlushPolicy flushPolicy, Hasher hasher, CloseAction closeAction, boolean newImpl) {
6473
this.flushPolicy = flushPolicy;
6574
this.hasher = hasher;
6675
this.closeAction = closeAction;
76+
this.newImpl = newImpl;
6777
}
6878

6979
/**
@@ -94,7 +104,7 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
94104
if (this.flushPolicy.equals(flushPolicy)) {
95105
return this;
96106
}
97-
return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
107+
return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction, newImpl);
98108
}
99109

100110
/**
@@ -124,7 +134,7 @@ public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) {
124134
if (this.closeAction == closeAction) {
125135
return this;
126136
}
127-
return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
137+
return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction, newImpl);
128138
}
129139

130140
/**
@@ -156,7 +166,7 @@ BlobAppendableUploadConfig withCrc32cValidationEnabled(boolean enabled) {
156166
return this;
157167
}
158168
return new BlobAppendableUploadConfig(
159-
flushPolicy, enabled ? Hasher.enabled() : Hasher.noop(), closeAction);
169+
flushPolicy, enabled ? Hasher.enabled() : Hasher.noop(), closeAction, newImpl);
160170
}
161171

162172
/** Never to be made public until {@link Hasher} is public */
@@ -165,6 +175,18 @@ Hasher getHasher() {
165175
return hasher;
166176
}
167177

178+
BlobAppendableUploadConfig useNewImpl() {
179+
return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction, true);
180+
}
181+
182+
@Override
183+
public String toString() {
184+
return MoreObjects.toStringHelper(this)
185+
.add("closeAction", closeAction)
186+
.add("flushPolicy", flushPolicy)
187+
.toString();
188+
}
189+
168190
/**
169191
* Default instance factory method.
170192
*
@@ -217,55 +239,123 @@ public enum CloseAction {
217239
}
218240

219241
BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts<ObjectTargetOpt> opts) {
220-
boolean takeOver = info.getGeneration() != null;
221-
BidiWriteObjectRequest req =
222-
takeOver
223-
? storage.getBidiWriteObjectRequestForTakeover(info, opts)
224-
: storage.getBidiWriteObjectRequest(info, opts);
225-
226-
BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver);
242+
if (newImpl) {
243+
// TODO: make configurable
244+
int maxRedirectsAllowed = 3;
227245

246+
long maxPendingBytes = this.getFlushPolicy().getMaxPendingBytes();
247+
AppendableUploadState state = storage.getAppendableState(info, opts, maxPendingBytes);
228248
WritableByteChannelSession<AppendableObjectBufferedWritableByteChannel, BidiWriteObjectResponse>
229249
build =
230-
ResumableMedia.gapic()
231-
.write()
232-
.bidiByteChannel(storage.storageClient.bidiWriteObjectCallable())
233-
.setHasher(this.getHasher())
234-
.setByteStringStrategy(ByteStringStrategy.copy())
235-
.appendable()
236-
.withRetryConfig(
237-
storage.retrier.withAlg(
238-
new BasicResultRetryAlgorithm<Object>() {
239-
@Override
240-
public boolean shouldRetry(
241-
Throwable previousThrowable, Object previousResponse) {
242-
// TODO: remove this later once the redirects are not handled by the
243-
// retry loop
244-
ApiException apiEx = null;
245-
if (previousThrowable instanceof StorageException) {
246-
StorageException se = (StorageException) previousThrowable;
247-
Throwable cause = se.getCause();
248-
if (cause instanceof ApiException) {
249-
apiEx = (ApiException) cause;
250+
new AppendableSession(
251+
ApiFutures.immediateFuture(state),
252+
(start, resultFuture) -> {
253+
BidiUploadStreamingStream stream =
254+
new BidiUploadStreamingStream(
255+
start,
256+
storage.storageDataClient.executor,
257+
storage.storageClient.bidiWriteObjectCallable(),
258+
maxRedirectsAllowed,
259+
storage.storageDataClient.retryContextProvider.create());
260+
ChunkSegmenter chunkSegmenter =
261+
new ChunkSegmenter(
262+
Hasher.enabled(),
263+
ByteStringStrategy.copy(),
264+
Math.min(
265+
Values.MAX_WRITE_CHUNK_BYTES_VALUE, Math.toIntExact(maxPendingBytes)),
266+
/* blockSize= */ 1);
267+
BidiAppendableUnbufferedWritableByteChannel c;
268+
if (state instanceof TakeoverAppendableUploadState) {
269+
// start the takeover reconciliation
270+
stream.awaitTakeoverStateReconciliation();
271+
c =
272+
new BidiAppendableUnbufferedWritableByteChannel(
273+
stream, chunkSegmenter, state.getConfirmedBytes());
274+
} else {
275+
c = new BidiAppendableUnbufferedWritableByteChannel(stream, chunkSegmenter, 0);
276+
}
277+
return new AppendableObjectBufferedWritableByteChannel(
278+
flushPolicy.createBufferedChannel(c),
279+
c,
280+
this.closeAction == CloseAction.FINALIZE_WHEN_CLOSING,
281+
newImpl);
282+
},
283+
state.getResultFuture());
284+
285+
return new BlobAppendableUploadImpl(
286+
new DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession<>(
287+
build, BidiBlobWriteSessionConfig.Factory.WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER));
288+
} else {
289+
boolean takeOver = info.getGeneration() != null;
290+
BidiWriteObjectRequest req =
291+
takeOver
292+
? storage.getBidiWriteObjectRequestForTakeover(info, opts)
293+
: storage.getBidiWriteObjectRequest(info, opts);
294+
295+
BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver);
296+
297+
WritableByteChannelSession<
298+
AppendableObjectBufferedWritableByteChannel, BidiWriteObjectResponse>
299+
build =
300+
ResumableMedia.gapic()
301+
.write()
302+
.bidiByteChannel(storage.storageClient.bidiWriteObjectCallable())
303+
.setHasher(this.getHasher())
304+
.setByteStringStrategy(ByteStringStrategy.copy())
305+
.appendable()
306+
.withRetryConfig(
307+
storage.retrier.withAlg(
308+
new BasicResultRetryAlgorithm<Object>() {
309+
@Override
310+
public boolean shouldRetry(
311+
Throwable previousThrowable, Object previousResponse) {
312+
// TODO: remove this later once the redirects are not handled by the
313+
// retry loop
314+
ApiException apiEx = null;
315+
if (previousThrowable instanceof StorageException) {
316+
StorageException se = (StorageException) previousThrowable;
317+
Throwable cause = se.getCause();
318+
if (cause instanceof ApiException) {
319+
apiEx = (ApiException) cause;
320+
}
250321
}
322+
if (apiEx instanceof AbortedException) {
323+
return true;
324+
}
325+
return storage
326+
.retryAlgorithmManager
327+
.idempotent()
328+
.shouldRetry(previousThrowable, null);
251329
}
252-
if (apiEx instanceof AbortedException) {
253-
return true;
254-
}
255-
return storage
256-
.retryAlgorithmManager
257-
.idempotent()
258-
.shouldRetry(previousThrowable, null);
259-
}
260-
}))
261-
.buffered(this.getFlushPolicy())
262-
.setStartAsync(ApiFutures.immediateFuture(baw))
263-
.setGetCallable(storage.storageClient.getObjectCallable())
264-
.setFinalizeOnClose(this.closeAction == CloseAction.FINALIZE_WHEN_CLOSING)
265-
.build();
330+
}))
331+
.buffered(this.getFlushPolicy())
332+
.setStartAsync(ApiFutures.immediateFuture(baw))
333+
.setGetCallable(storage.storageClient.getObjectCallable())
334+
.setFinalizeOnClose(this.closeAction == CloseAction.FINALIZE_WHEN_CLOSING)
335+
.build();
266336

267-
return new BlobAppendableUploadImpl(
268-
new DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession<>(
269-
build, BidiBlobWriteSessionConfig.Factory.WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER));
337+
return new BlobAppendableUploadImpl(
338+
new DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession<>(
339+
build, BidiBlobWriteSessionConfig.Factory.WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER));
340+
}
341+
}
342+
343+
private static final class AppendableSession
344+
extends ChannelSession<
345+
AppendableUploadState,
346+
BidiWriteObjectResponse,
347+
AppendableObjectBufferedWritableByteChannel>
348+
implements WritableByteChannelSession<
349+
AppendableObjectBufferedWritableByteChannel, BidiWriteObjectResponse> {
350+
private AppendableSession(
351+
ApiFuture<AppendableUploadState> startFuture,
352+
BiFunction<
353+
AppendableUploadState,
354+
SettableApiFuture<BidiWriteObjectResponse>,
355+
AppendableObjectBufferedWritableByteChannel>
356+
f,
357+
SettableApiFuture<BidiWriteObjectResponse> resultFuture) {
358+
super(startFuture, f, resultFuture);
359+
}
270360
}
271361
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadImpl.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.BetaApi;
2121
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
22+
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
2223
import com.google.common.base.Preconditions;
2324
import java.io.IOException;
2425
import java.nio.ByteBuffer;
@@ -63,17 +64,20 @@ static final class AppendableObjectBufferedWritableByteChannel
6364
implements BufferedWritableByteChannel,
6465
BlobAppendableUpload.AppendableUploadWriteableByteChannel {
6566
private final BufferedWritableByteChannel buffered;
66-
private final GapicBidiUnbufferedAppendableWritableByteChannel unbuffered;
67+
private final AppendableUnbufferedWritableByteChannel unbuffered;
6768
private final boolean finalizeOnClose;
6869
private final ReentrantLock lock;
70+
private final boolean newImpl;
6971

7072
AppendableObjectBufferedWritableByteChannel(
7173
BufferedWritableByteChannel buffered,
72-
GapicBidiUnbufferedAppendableWritableByteChannel unbuffered,
73-
boolean finalizeOnClose) {
74+
AppendableUnbufferedWritableByteChannel unbuffered,
75+
boolean finalizeOnClose,
76+
boolean newImpl) {
7477
this.buffered = buffered;
7578
this.unbuffered = unbuffered;
7679
this.finalizeOnClose = finalizeOnClose;
80+
this.newImpl = newImpl;
7781
lock = new ReentrantLock();
7882
}
7983

@@ -111,10 +115,17 @@ public boolean isOpen() {
111115
public void finalizeAndClose() throws IOException {
112116
lock.lock();
113117
try {
114-
if (buffered.isOpen()) {
115-
buffered.flush();
116-
unbuffered.finalizeWrite();
117-
buffered.close();
118+
if (newImpl) {
119+
if (buffered.isOpen()) {
120+
unbuffered.nextWriteShouldFinalize();
121+
buffered.close();
122+
}
123+
} else {
124+
if (buffered.isOpen()) {
125+
buffered.flush();
126+
unbuffered.finalizeWrite();
127+
buffered.close();
128+
}
118129
}
119130
} finally {
120131
lock.unlock();
@@ -142,4 +153,11 @@ public void close() throws IOException {
142153
}
143154
}
144155
}
156+
157+
/** Temporary shim to allow selective implementation */
158+
interface AppendableUnbufferedWritableByteChannel extends UnbufferedWritableByteChannel {
159+
default void nextWriteShouldFinalize() {}
160+
161+
default void finalizeWrite() throws IOException {}
162+
}
145163
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ChannelSession.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,16 @@ class ChannelSession<StartT, ResultT, ChannelT> {
3939

4040
ChannelSession(
4141
ApiFuture<StartT> startFuture, BiFunction<StartT, SettableApiFuture<ResultT>, ChannelT> f) {
42+
this(startFuture, f, SettableApiFuture.create());
43+
}
44+
45+
ChannelSession(
46+
ApiFuture<StartT> startFuture,
47+
BiFunction<StartT, SettableApiFuture<ResultT>, ChannelT> f,
48+
SettableApiFuture<ResultT> resultFuture) {
4249
this.startFuture = startFuture;
43-
this.resultFuture = SettableApiFuture.create();
44-
this.f = (s) -> f.apply(s, resultFuture);
50+
this.resultFuture = resultFuture;
51+
this.f = (s) -> f.apply(s, this.resultFuture);
4552
}
4653

4754
public ApiFuture<ChannelT> openAsync() {

0 commit comments

Comments
 (0)