Skip to content

Commit 1d27930

Browse files
fapiftabshashikant
authored andcommitted
HDDS-2233 - Remove ByteStringHelper and refactor the code to the place where it used (#1596)
1 parent 87d9f36 commit 1d27930

File tree

15 files changed

+217
-224
lines changed

15 files changed

+217
-224
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,18 @@
3636
import org.apache.hadoop.ozone.OzoneConfigKeys;
3737
import org.apache.hadoop.ozone.OzoneSecurityUtil;
3838
import org.apache.hadoop.security.UserGroupInformation;
39+
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
4142

4243
import java.io.Closeable;
4344
import java.io.IOException;
45+
import java.nio.ByteBuffer;
4446
import java.security.cert.CertificateException;
4547
import java.security.cert.X509Certificate;
4648
import java.util.concurrent.Callable;
4749
import java.util.concurrent.TimeUnit;
50+
import java.util.function.Function;
4851

4952
import static java.util.concurrent.TimeUnit.MILLISECONDS;
5053
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
@@ -307,6 +310,10 @@ public HddsProtos.ReplicationType getType() {
307310
return HddsProtos.ReplicationType.STAND_ALONE;
308311
}
309312

313+
public Function<ByteBuffer, ByteString> byteBufferToByteStringConversion(){
314+
return ByteStringConversion.createByteBufferConversion(conf);
315+
}
316+
310317
/**
311318
* Get xceiver client metric.
312319
*/

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.common.base.Preconditions;
2222
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2323
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
24-
import org.apache.hadoop.hdds.scm.ByteStringHelper;
2524
import org.apache.hadoop.hdds.scm.XceiverClientReply;
2625
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
2726
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -590,7 +589,7 @@ public boolean isClosed() {
590589
*/
591590
private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
592591
int effectiveChunkSize = chunk.remaining();
593-
ByteString data = ByteStringHelper.getByteString(chunk);
592+
ByteString data = bufferPool.byteStringConversion().apply(chunk);
594593
Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
595594
ChecksumData checksumData = checksum.computeChecksum(chunk);
596595
ChunkInfo chunkInfo = ChunkInfo.newBuilder()

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
package org.apache.hadoop.hdds.scm.storage;
2020

2121
import com.google.common.base.Preconditions;
22+
import org.apache.hadoop.hdds.scm.ByteStringConversion;
23+
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
2224

2325
import java.nio.ByteBuffer;
2426
import java.util.ArrayList;
2527
import java.util.List;
28+
import java.util.function.Function;
2629

2730
/**
2831
* This class creates and manages pool of n buffers.
@@ -33,12 +36,24 @@ public class BufferPool {
3336
private int currentBufferIndex;
3437
private final int bufferSize;
3538
private final int capacity;
39+
private final Function<ByteBuffer, ByteString> byteStringConversion;
3640

3741
public BufferPool(int bufferSize, int capacity) {
42+
this(bufferSize, capacity,
43+
ByteStringConversion.createByteBufferConversion(null));
44+
}
45+
46+
public BufferPool(int bufferSize, int capacity,
47+
Function<ByteBuffer, ByteString> byteStringConversion){
3848
this.capacity = capacity;
3949
this.bufferSize = bufferSize;
4050
bufferList = new ArrayList<>(capacity);
4151
currentBufferIndex = -1;
52+
this.byteStringConversion = byteStringConversion;
53+
}
54+
55+
public Function<ByteBuffer, ByteString> byteStringConversion(){
56+
return byteStringConversion;
4257
}
4358

4459
public ByteBuffer getCurrentBuffer() {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdds.scm;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.ozone.OzoneConfigKeys;
22+
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
23+
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
24+
25+
import java.nio.ByteBuffer;
26+
import java.util.function.Function;
27+
28+
/**
29+
* Helper class to create a conversion function from ByteBuffer to ByteString
30+
* based on the property
31+
* {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} in the
32+
* Ozone configuration.
33+
*/
34+
public final class ByteStringConversion {
35+
private ByteStringConversion(){} // no instantiation.
36+
37+
/**
38+
* Creates the conversion function to be used to convert ByteBuffers to
39+
* ByteString instances to be used in protobuf messages.
40+
*
41+
* @param config the Ozone configuration
42+
* @return the conversion function defined by
43+
* {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED}
44+
* @see <pre>ByteBuffer</pre>
45+
*/
46+
public static Function<ByteBuffer, ByteString> createByteBufferConversion(
47+
Configuration config){
48+
boolean unsafeEnabled =
49+
config!=null && config.getBoolean(
50+
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
51+
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
52+
if (unsafeEnabled) {
53+
return buffer -> UnsafeByteOperations.unsafeWrap(buffer);
54+
} else {
55+
return buffer -> {
56+
ByteString retval = ByteString.copyFrom(buffer);
57+
buffer.flip();
58+
return retval;
59+
};
60+
}
61+
}
62+
}

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringHelper.java

Lines changed: 0 additions & 69 deletions
This file was deleted.

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.concurrent.locks.ReentrantLock;
30+
import java.util.function.Function;
3031

3132
import org.apache.hadoop.conf.Configuration;
3233
import org.apache.hadoop.conf.StorageUnit;
@@ -46,7 +47,7 @@
4647
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
4748
.PutSmallFileRequestProto;
4849
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
49-
import org.apache.hadoop.hdds.scm.ByteStringHelper;
50+
import org.apache.hadoop.hdds.scm.ByteStringConversion;
5051
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
5152
import org.apache.hadoop.hdds.scm.container.common.helpers
5253
.StorageContainerException;
@@ -102,6 +103,7 @@ public class KeyValueHandler extends Handler {
102103
private final ChunkManager chunkManager;
103104
private final VolumeChoosingPolicy volumeChoosingPolicy;
104105
private final long maxContainerSize;
106+
private final Function<ByteBuffer, ByteString> byteBufferToByteString;
105107

106108
// A lock that is held during container creation.
107109
private final AutoCloseableLock containerCreationLock;
@@ -125,10 +127,8 @@ public KeyValueHandler(Configuration config, StateContext context,
125127
// this handler lock is used for synchronizing createContainer Requests,
126128
// so using a fair lock here.
127129
containerCreationLock = new AutoCloseableLock(new ReentrantLock(true));
128-
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
129-
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
130-
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
131-
ByteStringHelper.init(isUnsafeByteOperationsEnabled);
130+
byteBufferToByteString =
131+
ByteStringConversion.createByteBufferConversion(conf);
132132
}
133133

134134
@VisibleForTesting
@@ -547,15 +547,15 @@ ContainerCommandResponseProto handleReadChunk(
547547
}
548548

549549
// The container can become unhealthy after the lock is released.
550-
// The operation will likely fail/timeout in that happens.
550+
// The operation will likely fail/timeout if that happens.
551551
try {
552552
checkContainerIsHealthy(kvContainer);
553553
} catch (StorageContainerException sce) {
554554
return ContainerUtils.logAndReturnError(LOG, sce, request);
555555
}
556556

557557
ChunkInfo chunkInfo;
558-
byte[] data;
558+
ByteBuffer data;
559559
try {
560560
BlockID blockID = BlockID.getFromProtobuf(
561561
request.getReadChunk().getBlockID());
@@ -569,7 +569,7 @@ ContainerCommandResponseProto handleReadChunk(
569569

570570
data = chunkManager
571571
.readChunk(kvContainer, blockID, chunkInfo, dispatcherContext);
572-
metrics.incContainerBytesStats(Type.ReadChunk, data.length);
572+
metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen());
573573
} catch (StorageContainerException ex) {
574574
return ContainerUtils.logAndReturnError(LOG, ex, request);
575575
} catch (IOException ex) {
@@ -578,7 +578,18 @@ ContainerCommandResponseProto handleReadChunk(
578578
request);
579579
}
580580

581-
return ChunkUtils.getReadChunkResponse(request, data, chunkInfo);
581+
Preconditions.checkNotNull(data, "Chunk data is null");
582+
583+
ContainerProtos.ReadChunkResponseProto.Builder response =
584+
ContainerProtos.ReadChunkResponseProto.newBuilder();
585+
response.setChunkData(chunkInfo.getProtoBufMessage());
586+
response.setData(byteBufferToByteString.apply(data));
587+
response.setBlockID(request.getReadChunk().getBlockID());
588+
589+
ContainerCommandResponseProto.Builder builder =
590+
ContainerUtils.getSuccessResponseBuilder(request);
591+
builder.setReadChunk(response);
592+
return builder.build();
582593
}
583594

584595
/**
@@ -800,9 +811,9 @@ ContainerCommandResponseProto handleGetSmallFile(
800811
for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
801812
// if the block is committed, all chunks must have been committed.
802813
// Tmp chunk files won't exist here.
803-
byte[] data = chunkManager.readChunk(kvContainer, blockID,
814+
ByteBuffer data = chunkManager.readChunk(kvContainer, blockID,
804815
ChunkInfo.getFromProtoBuf(chunk), dispatcherContext);
805-
ByteString current = ByteString.copyFrom(data);
816+
ByteString current = byteBufferToByteString.apply(data);
806817
dataBuf = dataBuf.concat(current);
807818
chunkInfo = chunk;
808819
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
.ContainerCommandRequestProto;
2525
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
2626
.ContainerCommandResponseProto;
27-
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
28-
.ReadChunkResponseProto;
29-
import org.apache.hadoop.hdds.scm.ByteStringHelper;
3027
import org.apache.hadoop.hdds.scm.container.common.helpers
3128
.StorageContainerException;
3229
import org.apache.hadoop.io.IOUtils;
@@ -142,8 +139,7 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo,
142139
* @return ByteBuffer
143140
*/
144141
public static ByteBuffer readData(File chunkFile, ChunkInfo data,
145-
VolumeIOStats volumeIOStats) throws StorageContainerException,
146-
ExecutionException, InterruptedException {
142+
VolumeIOStats volumeIOStats) throws StorageContainerException {
147143
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
148144

149145
if (!chunkFile.exists()) {
@@ -168,6 +164,7 @@ public static ByteBuffer readData(File chunkFile, ChunkInfo data,
168164

169165
try (FileLock ignored = file.lock(offset, len, true)) {
170166
file.read(buf, offset);
167+
buf.flip();
171168
}
172169

173170
// Increment volumeIO stats here.
@@ -287,33 +284,6 @@ public static ContainerCommandResponseProto getChunkResponseSuccess(
287284
return ContainerUtils.getSuccessResponse(msg);
288285
}
289286

290-
/**
291-
* Gets a response to the read chunk calls.
292-
*
293-
* @param msg - Msg
294-
* @param data - Data
295-
* @param info - Info
296-
* @return Response.
297-
*/
298-
public static ContainerCommandResponseProto getReadChunkResponse(
299-
ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
300-
Preconditions.checkNotNull(msg);
301-
Preconditions.checkNotNull(data, "Chunk data is null");
302-
Preconditions.checkNotNull(info, "Chunk Info is null");
303-
304-
ReadChunkResponseProto.Builder response =
305-
ReadChunkResponseProto.newBuilder();
306-
response.setChunkData(info.getProtoBufMessage());
307-
response.setData(
308-
ByteStringHelper.getByteString(data));
309-
response.setBlockID(msg.getReadChunk().getBlockID());
310-
311-
ContainerCommandResponseProto.Builder builder =
312-
ContainerUtils.getSuccessResponseBuilder(msg);
313-
builder.setReadChunk(response);
314-
return builder.build();
315-
}
316-
317287
@VisibleForTesting
318288
static <T, E extends Exception> T processFileExclusively(
319289
Path path, CheckedSupplier<T, E> op

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
120120
* TODO: Explore if we need to do that for ozone.
121121
*/
122122
@Override
123-
public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
124-
DispatcherContext dispatcherContext) {
123+
public ByteBuffer readChunk(Container container, BlockID blockID,
124+
ChunkInfo info, DispatcherContext dispatcherContext) {
125125

126126
long readStartTime = Time.monotonicNow();
127127

@@ -138,7 +138,7 @@ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
138138
volumeIOStats.incReadOpCount();
139139
volumeIOStats.incReadBytes(info.getLen());
140140

141-
return data.array();
141+
return data;
142142
}
143143

144144
/**

0 commit comments

Comments
 (0)