24
24
import com .google .api .core .BetaApi ;
25
25
import com .google .api .core .InternalApi ;
26
26
import com .google .api .core .SettableApiFuture ;
27
- import com .google .api .gax .retrying .BasicResultRetryAlgorithm ;
28
- import com .google .api .gax .rpc .AbortedException ;
29
- import com .google .api .gax .rpc .ApiException ;
30
27
import com .google .cloud .storage .BidiUploadState .AppendableUploadState ;
31
28
import com .google .cloud .storage .BidiUploadState .TakeoverAppendableUploadState ;
32
29
import com .google .cloud .storage .BlobAppendableUpload .AppendableUploadWriteableByteChannel ;
36
33
import com .google .cloud .storage .UnifiedOpts .ObjectTargetOpt ;
37
34
import com .google .cloud .storage .UnifiedOpts .Opts ;
38
35
import com .google .common .base .MoreObjects ;
39
- import com .google .storage .v2 .BidiWriteObjectRequest ;
40
36
import com .google .storage .v2 .BidiWriteObjectResponse ;
41
- import com .google .storage .v2 .Object ;
42
37
import com .google .storage .v2 .ServiceConstants .Values ;
43
38
import java .util .function .BiFunction ;
44
39
import javax .annotation .concurrent .Immutable ;
@@ -60,20 +55,17 @@ public final class BlobAppendableUploadConfig {
60
55
new BlobAppendableUploadConfig (
61
56
FlushPolicy .minFlushSize (_256KiB ),
62
57
Hasher .enabled (),
63
- CloseAction .CLOSE_WITHOUT_FINALIZING ,
64
- false );
58
+ CloseAction .CLOSE_WITHOUT_FINALIZING );
65
59
66
60
private final FlushPolicy flushPolicy ;
67
61
private final Hasher hasher ;
68
62
private final CloseAction closeAction ;
69
- private final boolean newImpl ;
70
63
71
64
private BlobAppendableUploadConfig (
72
- FlushPolicy flushPolicy , Hasher hasher , CloseAction closeAction , boolean newImpl ) {
65
+ FlushPolicy flushPolicy , Hasher hasher , CloseAction closeAction ) {
73
66
this .flushPolicy = flushPolicy ;
74
67
this .hasher = hasher ;
75
68
this .closeAction = closeAction ;
76
- this .newImpl = newImpl ;
77
69
}
78
70
79
71
/**
@@ -104,7 +96,7 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
104
96
if (this .flushPolicy .equals (flushPolicy )) {
105
97
return this ;
106
98
}
107
- return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction , newImpl );
99
+ return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction );
108
100
}
109
101
110
102
/**
@@ -134,7 +126,7 @@ public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) {
134
126
if (this .closeAction == closeAction ) {
135
127
return this ;
136
128
}
137
- return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction , newImpl );
129
+ return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction );
138
130
}
139
131
140
132
/**
@@ -166,7 +158,7 @@ BlobAppendableUploadConfig withCrc32cValidationEnabled(boolean enabled) {
166
158
return this ;
167
159
}
168
160
return new BlobAppendableUploadConfig (
169
- flushPolicy , enabled ? Hasher .enabled () : Hasher .noop (), closeAction , newImpl );
161
+ flushPolicy , enabled ? Hasher .enabled () : Hasher .noop (), closeAction );
170
162
}
171
163
172
164
/** Never to be made public until {@link Hasher} is public */
@@ -175,10 +167,6 @@ Hasher getHasher() {
175
167
return hasher ;
176
168
}
177
169
178
- BlobAppendableUploadConfig useNewImpl () {
179
- return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction , true );
180
- }
181
-
182
170
@ Override
183
171
public String toString () {
184
172
return MoreObjects .toStringHelper (this )
@@ -239,109 +227,50 @@ public enum CloseAction {
239
227
}
240
228
241
229
BlobAppendableUpload create (GrpcStorageImpl storage , BlobInfo info , Opts <ObjectTargetOpt > opts ) {
242
- if (newImpl ) {
243
- // TODO: make configurable
244
- int maxRedirectsAllowed = 3 ;
245
-
246
- long maxPendingBytes = this .getFlushPolicy ().getMaxPendingBytes ();
247
- AppendableUploadState state = storage .getAppendableState (info , opts , maxPendingBytes );
248
- WritableByteChannelSession <
249
- AppendableObjectBufferedWritableByteChannel , BidiWriteObjectResponse >
250
- build =
251
- new AppendableSession (
252
- ApiFutures .immediateFuture (state ),
253
- (start , resultFuture ) -> {
254
- BidiUploadStreamingStream stream =
255
- new BidiUploadStreamingStream (
256
- start ,
257
- storage .storageDataClient .executor ,
258
- storage .storageClient .bidiWriteObjectCallable (),
259
- maxRedirectsAllowed ,
260
- storage .storageDataClient .retryContextProvider .create ());
261
- ChunkSegmenter chunkSegmenter =
262
- new ChunkSegmenter (
263
- Hasher .enabled (),
264
- ByteStringStrategy .copy (),
265
- Math .min (
266
- Values .MAX_WRITE_CHUNK_BYTES_VALUE ,
267
- Math .toIntExact (maxPendingBytes )),
268
- /* blockSize= */ 1 );
269
- BidiAppendableUnbufferedWritableByteChannel c ;
270
- if (state instanceof TakeoverAppendableUploadState ) {
271
- // start the takeover reconciliation
272
- stream .awaitTakeoverStateReconciliation ();
273
- c =
274
- new BidiAppendableUnbufferedWritableByteChannel (
275
- stream , chunkSegmenter , state .getConfirmedBytes ());
276
- } else {
277
- c =
278
- new BidiAppendableUnbufferedWritableByteChannel (
279
- stream , chunkSegmenter , 0 );
280
- }
281
- return new AppendableObjectBufferedWritableByteChannel (
282
- flushPolicy .createBufferedChannel (c ),
283
- c ,
284
- this .closeAction == CloseAction .FINALIZE_WHEN_CLOSING ,
285
- newImpl );
286
- },
287
- state .getResultFuture ());
230
+ // TODO: make configurable
231
+ int maxRedirectsAllowed = 3 ;
288
232
289
- return new BlobAppendableUploadImpl (
290
- new DefaultBlobWriteSessionConfig .DecoratedWritableByteChannelSession <>(
291
- build , BidiBlobWriteSessionConfig .Factory .WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER ));
292
- } else {
293
- boolean takeOver = info .getGeneration () != null ;
294
- BidiWriteObjectRequest req =
295
- takeOver
296
- ? storage .getBidiWriteObjectRequestForTakeover (info , opts )
297
- : storage .getBidiWriteObjectRequest (info , opts );
233
+ long maxPendingBytes = this .getFlushPolicy ().getMaxPendingBytes ();
234
+ AppendableUploadState state = storage .getAppendableState (info , opts , maxPendingBytes );
235
+ WritableByteChannelSession <AppendableObjectBufferedWritableByteChannel , BidiWriteObjectResponse >
236
+ build =
237
+ new AppendableSession (
238
+ ApiFutures .immediateFuture (state ),
239
+ (start , resultFuture ) -> {
240
+ BidiUploadStreamingStream stream =
241
+ new BidiUploadStreamingStream (
242
+ start ,
243
+ storage .storageDataClient .executor ,
244
+ storage .storageClient .bidiWriteObjectCallable (),
245
+ maxRedirectsAllowed ,
246
+ storage .storageDataClient .retryContextProvider .create ());
247
+ ChunkSegmenter chunkSegmenter =
248
+ new ChunkSegmenter (
249
+ Hasher .enabled (),
250
+ ByteStringStrategy .copy (),
251
+ Math .min (
252
+ Values .MAX_WRITE_CHUNK_BYTES_VALUE , Math .toIntExact (maxPendingBytes )),
253
+ /* blockSize= */ 1 );
254
+ BidiAppendableUnbufferedWritableByteChannel c ;
255
+ if (state instanceof TakeoverAppendableUploadState ) {
256
+ // start the takeover reconciliation
257
+ stream .awaitTakeoverStateReconciliation ();
258
+ c =
259
+ new BidiAppendableUnbufferedWritableByteChannel (
260
+ stream , chunkSegmenter , state .getConfirmedBytes ());
261
+ } else {
262
+ c = new BidiAppendableUnbufferedWritableByteChannel (stream , chunkSegmenter , 0 );
263
+ }
264
+ return new AppendableObjectBufferedWritableByteChannel (
265
+ flushPolicy .createBufferedChannel (c ),
266
+ c ,
267
+ this .closeAction == CloseAction .FINALIZE_WHEN_CLOSING );
268
+ },
269
+ state .getResultFuture ());
298
270
299
- BidiAppendableWrite baw = new BidiAppendableWrite (req , takeOver );
300
-
301
- WritableByteChannelSession <
302
- AppendableObjectBufferedWritableByteChannel , BidiWriteObjectResponse >
303
- build =
304
- ResumableMedia .gapic ()
305
- .write ()
306
- .bidiByteChannel (storage .storageClient .bidiWriteObjectCallable ())
307
- .setHasher (this .getHasher ())
308
- .setByteStringStrategy (ByteStringStrategy .copy ())
309
- .appendable ()
310
- .withRetryConfig (
311
- storage .retrier .withAlg (
312
- new BasicResultRetryAlgorithm <Object >() {
313
- @ Override
314
- public boolean shouldRetry (
315
- Throwable previousThrowable , Object previousResponse ) {
316
- // TODO: remove this later once the redirects are not handled by the
317
- // retry loop
318
- ApiException apiEx = null ;
319
- if (previousThrowable instanceof StorageException ) {
320
- StorageException se = (StorageException ) previousThrowable ;
321
- Throwable cause = se .getCause ();
322
- if (cause instanceof ApiException ) {
323
- apiEx = (ApiException ) cause ;
324
- }
325
- }
326
- if (apiEx instanceof AbortedException ) {
327
- return true ;
328
- }
329
- return storage
330
- .retryAlgorithmManager
331
- .idempotent ()
332
- .shouldRetry (previousThrowable , null );
333
- }
334
- }))
335
- .buffered (this .getFlushPolicy ())
336
- .setStartAsync (ApiFutures .immediateFuture (baw ))
337
- .setGetCallable (storage .storageClient .getObjectCallable ())
338
- .setFinalizeOnClose (this .closeAction == CloseAction .FINALIZE_WHEN_CLOSING )
339
- .build ();
340
-
341
- return new BlobAppendableUploadImpl (
342
- new DefaultBlobWriteSessionConfig .DecoratedWritableByteChannelSession <>(
343
- build , BidiBlobWriteSessionConfig .Factory .WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER ));
344
- }
271
+ return new BlobAppendableUploadImpl (
272
+ new DefaultBlobWriteSessionConfig .DecoratedWritableByteChannelSession <>(
273
+ build , BidiBlobWriteSessionConfig .Factory .WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER ));
345
274
}
346
275
347
276
private static final class AppendableSession
0 commit comments