Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel {
* operation upon this channel, however, then an invocation of this method will block until the
* first operation is complete.
*
* <p>If your application needs to empty its ByteBuffer before progressing, use our helper
* method {@link StorageNonBlockingChannelUtils#blockingEmptyTo(ByteBuffer,
* WritableByteChannel)} like so:
*
* <pre>{@code
* try (AppendableUploadWriteableByteChannel channel = session.open()) {
* int written = StorageNonBlockingChannelUtils.blockingEmptyTo(byteBuffer, channel);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ni: it's weird that "Non blocking utils" has blocking utility methods.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/StorageNonBlockingChannelUtils/StorageChannelUtils/g?

* }
* }</pre>
*
* @param src The buffer from which bytes are to be retrieved
* @return The number of bytes written, possibly zero
* @throws ClosedChannelException If this channel is closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,33 +159,11 @@ static int alignSize(int size, int alignmentMultiple) {
}

static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException {
int total = 0;
while (buf.hasRemaining()) {
int read = c.read(buf);
if (read != -1) {
total += read;
} else if (total == 0) {
return -1;
} else {
break;
}
}
return total;
return StorageNonBlockingChannelUtils.blockingFillFrom(buf, c);
}

static int emptyTo(ByteBuffer buf, WritableByteChannel c) throws IOException {
int total = 0;
while (buf.hasRemaining()) {
int written = c.write(buf);
if (written != -1) {
total += written;
} else if (total == 0) {
return -1;
} else {
break;
}
}
return total;
return StorageNonBlockingChannelUtils.blockingEmptyTo(buf, c);
}

static long totalRemaining(ByteBuffer[] buffers, int offset, int length) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;

/**
* Set of utility methods for working with non-blocking channels returned by this library.
*
* @since 2.56.0
*/
public final class StorageNonBlockingChannelUtils {

private StorageNonBlockingChannelUtils() {}

/**
* Attempt to fill {@code buf} from {@code c}, blocking the invoking thread if necessary in order
* to do so.
*
* <p>This method will not close {@code c}.
*
* @return The number of bytes read, possibly zero, or {@code -1} if the channel has reached
* end-of-stream
* @throws IOException any IOException from calling {@link ReadableByteChannel#read(ByteBuffer)}
* @since 2.56.0
*/
public static int blockingFillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException {
int total = 0;
while (buf.hasRemaining()) {
int read = c.read(buf);
if (read != -1) {
total += read;
} else if (total == 0) {
return -1;
} else {
break;
}
}
return total;
}

/**
* Attempt to empty {@code buf} to {@code c}, blocking the invoking thread if necessary in order
* to do so.
*
* <p>This method will not close {@code c}
*
* @return The number of bytes written, possibly zero
* @throws IOException any IOException from calling {@link WritableByteChannel#write(ByteBuffer)}
* @since 2.56.0
*/
public static int blockingEmptyTo(ByteBuffer buf, WritableByteChannel c) throws IOException {
int total = 0;
while (buf.hasRemaining()) {
int written = c.write(buf);
if (written != 0) {
total += written;
}
}
return total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@

package com.google.cloud.storage;

import static com.google.cloud.storage.TestUtils.assertAll;
import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

public final class BuffersTest {
Expand Down Expand Up @@ -77,87 +72,4 @@ public void allocateAligned_evenlyDivisible_capacityGtAlignment() {
ByteBuffer b1 = Buffers.allocateAligned(8, 4);
assertThat(b1.capacity()).isEqualTo(8);
}

@Test
public void fillFrom_handles_0SizeRead_someBytesRead() throws Exception {
byte[] bytes = new byte[14];
ByteBuffer buf = ByteBuffer.wrap(bytes);

byte[] expected =
new byte[] {
(byte) 'A',
(byte) 'B',
(byte) 'C',
(byte) 'A',
(byte) 'B',
(byte) 'A',
(byte) 'A',
(byte) 'A',
(byte) 'B',
(byte) 'A',
(byte) 'B',
(byte) 'C',
(byte) 0,
(byte) 0
};

int[] acceptSequence = new int[] {3, 2, 1, 0, 0, 1, 2, 3};
AtomicInteger readCount = new AtomicInteger(0);

ReadableByteChannel c =
new ReadableByteChannel() {
@Override
public int read(ByteBuffer dst) throws IOException {
int i = readCount.getAndIncrement();
if (i == acceptSequence.length) {
return -1;
}
int bytesToRead = acceptSequence[i];
if (bytesToRead > 0) {
long copy =
Buffers.copy(DataGenerator.base64Characters().genByteBuffer(bytesToRead), dst);
assertThat(copy).isEqualTo(bytesToRead);
}

return bytesToRead;
}

@Override
public boolean isOpen() {
return true;
}

@Override
public void close() throws IOException {}
};
int filled = Buffers.fillFrom(buf, c);

assertAll(
() -> assertThat(filled).isEqualTo(12),
() -> assertThat(xxd(bytes)).isEqualTo(xxd(expected)));
}

@Test
public void fillFrom_handles_0SizeRead_noBytesRead() throws Exception {
ByteBuffer buf = ByteBuffer.allocate(3);

ReadableByteChannel c =
new ReadableByteChannel() {
@Override
public int read(ByteBuffer dst) throws IOException {
return -1;
}

@Override
public boolean isOpen() {
return true;
}

@Override
public void close() throws IOException {}
};
int filled = Buffers.fillFrom(buf, c);

assertThat(filled).isEqualTo(-1);
}
}
Loading
Loading