Skip to content

Commit 0858e8a

Browse files
committed
chore: add BidiAppendableUnbufferedWritableByteChannel
This will have tests added in a followup commit
1 parent f3d0fff commit 0858e8a

File tree

1 file changed

+189
-0
lines changed

1 file changed

+189
-0
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.cloud.BaseServiceException;
20+
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
21+
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
22+
import java.io.IOException;
23+
import java.io.InterruptedIOException;
24+
import java.nio.ByteBuffer;
25+
import java.nio.channels.ClosedChannelException;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
30+
final class BidiAppendableUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {
31+
32+
private final BidiUploadStreamingStream stream;
33+
private final ChunkSegmenter chunkSegmenter;
34+
35+
private boolean open;
36+
private long writeOffset;
37+
private volatile boolean nextWriteShouldFinalize;
38+
private boolean writeCalledAtLeastOnce;
39+
40+
/** If write throws an error, don't attempt to finalize things when {@link #close()} is called. */
41+
private boolean writeThrewError;
42+
43+
BidiAppendableUnbufferedWritableByteChannel(
44+
BidiUploadStreamingStream stream, ChunkSegmenter chunkSegmenter, long writeOffset) {
45+
this.stream = stream;
46+
this.chunkSegmenter = chunkSegmenter;
47+
this.open = true;
48+
this.writeOffset = writeOffset;
49+
this.nextWriteShouldFinalize = false;
50+
this.writeThrewError = false;
51+
}
52+
53+
@Override
54+
public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
55+
return internalWrite(srcs, srcsOffset, srcsLength);
56+
}
57+
58+
@Override
59+
public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
60+
long totalRemaining = Buffers.totalRemaining(srcs, offset, length);
61+
long written;
62+
do {
63+
written = internalWrite(srcs, offset, length);
64+
} while (written < totalRemaining);
65+
close();
66+
return written;
67+
}
68+
69+
@Override
70+
public boolean isOpen() {
71+
return open;
72+
}
73+
74+
@Override
75+
public void close() throws IOException {
76+
if (!open) {
77+
return;
78+
}
79+
try {
80+
if (writeThrewError) {
81+
return;
82+
}
83+
84+
if (!writeCalledAtLeastOnce) {
85+
stream.flush();
86+
}
87+
if (nextWriteShouldFinalize) {
88+
//noinspection StatementWithEmptyBody
89+
while (!stream.finishWrite(writeOffset)) {}
90+
} else {
91+
//noinspection StatementWithEmptyBody
92+
while (!stream.closeStream(writeOffset)) {}
93+
}
94+
95+
awaitResultFuture();
96+
} finally {
97+
stream.sendClose();
98+
open = false;
99+
}
100+
}
101+
102+
public void nextWriteShouldFinalize() {
103+
this.nextWriteShouldFinalize = true;
104+
}
105+
106+
private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
107+
if (!open) {
108+
throw new ClosedChannelException();
109+
}
110+
// error early. if the result future is already failed, await it to throw the error
111+
if (stream.getResultFuture().isDone()) {
112+
awaitResultFuture();
113+
return 0;
114+
}
115+
writeCalledAtLeastOnce = true;
116+
117+
long availableCapacity = stream.availableCapacity();
118+
if (availableCapacity <= 0) {
119+
return 0;
120+
}
121+
RewindableContent rewindableContent = RewindableContent.of(srcs, srcsOffset, srcsLength);
122+
long totalBufferRemaining = rewindableContent.getLength();
123+
124+
ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength, true);
125+
if (data.length == 0) {
126+
return 0;
127+
}
128+
129+
long bytesConsumed = 0;
130+
for (int i = 0, len = data.length, lastIdx = len - 1; i < len; i++) {
131+
ChunkSegment datum = data[i];
132+
int size = datum.getB().size();
133+
boolean appended;
134+
if (i < lastIdx) {
135+
appended = stream.append(datum);
136+
} else if (i == lastIdx && nextWriteShouldFinalize) {
137+
appended = stream.appendAndFinalize(datum);
138+
} else {
139+
appended = stream.appendAndFlush(datum);
140+
}
141+
if (appended) {
142+
bytesConsumed += size;
143+
writeOffset += size;
144+
} else {
145+
// if we weren't able to trigger a flush by reaching the end of the array and calling
146+
// appendAndFlush, explicitly call flush here so that some progress can be made.
147+
// we prefer appendAndFlush so a separate message is not needed, but an extra message
148+
// in order to make progress and free buffer space is better than ending up in a live-lock.
149+
stream.flush();
150+
break;
151+
}
152+
}
153+
154+
if (bytesConsumed != totalBufferRemaining) {
155+
rewindableContent.rewindTo(bytesConsumed);
156+
}
157+
158+
return bytesConsumed;
159+
}
160+
161+
private void awaitResultFuture() throws IOException {
162+
try {
163+
stream.getResultFuture().get(10_717, TimeUnit.MILLISECONDS);
164+
} catch (InterruptedException e) {
165+
Thread.currentThread().interrupt();
166+
InterruptedIOException ioe = new InterruptedIOException();
167+
ioe.initCause(e);
168+
writeThrewError = true;
169+
throw ioe;
170+
} catch (ExecutionException e) {
171+
BaseServiceException coalesce = StorageException.coalesce(e.getCause());
172+
String message = coalesce.getMessage();
173+
String ioExceptionMessage = message;
174+
// if the failure is an upload scenario we detect client side, it's message will be
175+
// verbose. To avoid duplication, select the first line only for the io exception
176+
int firstNewLineIndex = message != null ? message.indexOf('\n') : -1;
177+
if (firstNewLineIndex > -1) {
178+
ioExceptionMessage = message.substring(0, firstNewLineIndex);
179+
}
180+
IOException ioException = new IOException(ioExceptionMessage, coalesce);
181+
// ioException.addSuppressed(new AsyncStorageTaskException());
182+
writeThrewError = true;
183+
throw ioException;
184+
} catch (TimeoutException e) {
185+
writeThrewError = true;
186+
throw new IOException(e);
187+
}
188+
}
189+
}

0 commit comments

Comments
 (0)