Skip to content

Commit 787ac74

Browse files
committed
HDDS-2169. Avoid buffer copies while submitting client requests in Ratis.
1 parent f209722 commit 787ac74

File tree

6 files changed

+291
-37
lines changed

6 files changed

+291
-37
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
4242
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
4343
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
44+
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
4445
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
4546
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
4647
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
@@ -56,7 +57,6 @@
5657
import org.apache.ratis.retry.RetryPolicy;
5758
import org.apache.ratis.rpc.RpcType;
5859
import org.apache.ratis.rpc.SupportedRpcType;
59-
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
6060
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
6161
import org.apache.ratis.util.TimeDuration;
6262
import org.slf4j.Logger;
@@ -219,39 +219,16 @@ private CompletableFuture<RaftClientReply> sendRequestAsync(
219219
try (Scope scope = GlobalTracer.get()
220220
.buildSpan("XceiverClientRatis." + request.getCmdType().name())
221221
.startActive(true)) {
222-
ContainerCommandRequestProto finalPayload =
223-
ContainerCommandRequestProto.newBuilder(request)
224-
.setTraceID(TracingUtil.exportCurrentSpan())
225-
.build();
226-
boolean isReadOnlyRequest = HddsUtils.isReadOnly(finalPayload);
227-
ByteString byteString = finalPayload.toByteString();
228-
if (LOG.isDebugEnabled()) {
229-
LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest,
230-
sanitizeForDebug(finalPayload));
222+
final ContainerCommandRequestMessage message
223+
= ContainerCommandRequestMessage.toMessage(
224+
request, TracingUtil.exportCurrentSpan());
225+
if (HddsUtils.isReadOnly(request)) {
226+
LOG.debug("sendCommandAsync ReadOnly {}", message);
227+
return getClient().sendReadOnlyAsync(message);
228+
} else {
229+
LOG.debug("sendCommandAsync {}", message);
230+
return getClient().sendAsync(message);
231231
}
232-
return isReadOnlyRequest ?
233-
getClient().sendReadOnlyAsync(() -> byteString) :
234-
getClient().sendAsync(() -> byteString);
235-
}
236-
}
237-
238-
private ContainerCommandRequestProto sanitizeForDebug(
239-
ContainerCommandRequestProto request) {
240-
switch (request.getCmdType()) {
241-
case PutSmallFile:
242-
return request.toBuilder()
243-
.setPutSmallFile(request.getPutSmallFile().toBuilder()
244-
.clearData()
245-
)
246-
.build();
247-
case WriteChunk:
248-
return request.toBuilder()
249-
.setWriteChunk(request.getWriteChunk().toBuilder()
250-
.clearData()
251-
)
252-
.build();
253-
default:
254-
return request;
255232
}
256233
}
257234

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdds.ratis;
19+
20+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
21+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
22+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
23+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
24+
import org.apache.ratis.protocol.Message;
25+
import org.apache.ratis.protocol.RaftGroupId;
26+
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
27+
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
28+
import org.apache.ratis.util.JavaUtils;
29+
30+
import java.util.Objects;
31+
import java.util.function.Supplier;
32+
33+
/**
34+
* Implementing the {@link Message} interface
35+
* for {@link ContainerCommandRequestProto}.
36+
*/
37+
public final class ContainerCommandRequestMessage implements Message {
38+
public static ContainerCommandRequestMessage toMessage(
39+
ContainerCommandRequestProto request, String traceId) {
40+
final ContainerCommandRequestProto.Builder b
41+
= ContainerCommandRequestProto.newBuilder(request);
42+
if (traceId != null) {
43+
b.setTraceID(traceId);
44+
}
45+
46+
ByteString data = ByteString.EMPTY;
47+
if (request.getCmdType() == Type.WriteChunk) {
48+
final WriteChunkRequestProto w = request.getWriteChunk();
49+
data = w.getData();
50+
b.setWriteChunk(w.toBuilder().clearData());
51+
} else if (request.getCmdType() == Type.PutSmallFile) {
52+
final PutSmallFileRequestProto p = request.getPutSmallFile();
53+
data = p.getData();
54+
b.setPutSmallFile(p.toBuilder().setData(ByteString.EMPTY));
55+
}
56+
return new ContainerCommandRequestMessage(b.build(), data);
57+
}
58+
59+
public static ContainerCommandRequestProto toProto(
60+
ByteString bytes, RaftGroupId groupId)
61+
throws InvalidProtocolBufferException {
62+
final int i = 4 + bytes.asReadOnlyByteBuffer().getInt();
63+
final ContainerCommandRequestProto header
64+
= ContainerCommandRequestProto.parseFrom(bytes.substring(4, i));
65+
// TODO: setting pipeline id can be avoided if the client is sending it.
66+
// In such case, just have to validate the pipeline id.
67+
final ContainerCommandRequestProto.Builder b = header.toBuilder();
68+
if (groupId != null) {
69+
b.setPipelineID(groupId.getUuid().toString());
70+
}
71+
final ByteString data = bytes.substring(i);
72+
if (header.getCmdType() == Type.WriteChunk) {
73+
b.setWriteChunk(b.getWriteChunkBuilder().setData(data));
74+
} else if (header.getCmdType() == Type.PutSmallFile) {
75+
b.setPutSmallFile(b.getPutSmallFileBuilder().setData(data));
76+
}
77+
return b.build();
78+
}
79+
80+
private final ContainerCommandRequestProto header;
81+
private final ByteString data;
82+
private final Supplier<ByteString> contentSupplier
83+
= JavaUtils.memoize(this::buildContent);
84+
85+
private ContainerCommandRequestMessage(
86+
ContainerCommandRequestProto header, ByteString data) {
87+
this.header = Objects.requireNonNull(header, "header == null");
88+
this.data = Objects.requireNonNull(data, "data == null");
89+
}
90+
91+
private ByteString buildContent() {
92+
final ByteString headerBytes = header.toByteString();
93+
return RatisHelper.int2ByteString(headerBytes.size())
94+
.concat(headerBytes)
95+
.concat(data);
96+
}
97+
98+
@Override
99+
public ByteString getContent() {
100+
return contentSupplier.get();
101+
}
102+
103+
@Override
104+
public String toString() {
105+
return header + ", data.size=" + data.size();
106+
}
107+
}

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.hdds.ratis;
2020

21+
import java.io.DataOutputStream;
2122
import java.io.IOException;
2223
import java.security.cert.CertificateException;
2324
import java.security.cert.X509Certificate;
@@ -272,4 +273,15 @@ static Long getMinReplicatedIndex(
272273
return commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex)
273274
.min(Long::compareTo).orElse(null);
274275
}
276+
277+
static ByteString int2ByteString(int n) {
278+
final ByteString.Output out = ByteString.newOutput();
279+
try(DataOutputStream dataOut = new DataOutputStream(out)) {
280+
dataOut.writeInt(n);
281+
} catch (IOException e) {
282+
throw new IllegalStateException(
283+
"Failed to write integer n = " + n + " to a ByteString.", e);
284+
}
285+
return out.toByteString();
286+
}
275287
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdds.ratis;
19+
20+
import org.apache.hadoop.hdds.client.BlockID;
21+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
22+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
23+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
24+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
25+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
26+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
27+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
28+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
29+
import org.apache.hadoop.ozone.common.Checksum;
30+
import org.apache.hadoop.ozone.common.ChecksumData;
31+
import org.apache.hadoop.ozone.common.OzoneChecksumException;
32+
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
33+
import org.junit.Assert;
34+
import org.junit.Test;
35+
36+
import java.util.Random;
37+
import java.util.UUID;
38+
import java.util.function.BiFunction;
39+
40+
/** Testing {@link ContainerCommandRequestMessage}. */
41+
public class TestContainerCommandRequestMessage {
42+
static final Random RANDOM = new Random();
43+
44+
static ByteString newData(int length, Random random) {
45+
final ByteString.Output out = ByteString.newOutput();
46+
for(int i = 0; i < length; i++) {
47+
out.write(random.nextInt());
48+
}
49+
return out.toByteString();
50+
}
51+
52+
static ChecksumData checksum(ByteString data) {
53+
try {
54+
return new Checksum().computeChecksum(data.toByteArray());
55+
} catch (OzoneChecksumException e) {
56+
throw new IllegalStateException(e);
57+
}
58+
}
59+
60+
static ContainerCommandRequestProto newPutSmallFile(
61+
BlockID blockID, ByteString data) {
62+
final BlockData.Builder blockData
63+
= BlockData.newBuilder()
64+
.setBlockID(blockID.getDatanodeBlockIDProtobuf());
65+
final PutBlockRequestProto.Builder putBlockRequest
66+
= PutBlockRequestProto.newBuilder()
67+
.setBlockData(blockData);
68+
final KeyValue keyValue = KeyValue.newBuilder()
69+
.setKey("OverWriteRequested")
70+
.setValue("true")
71+
.build();
72+
final ChunkInfo chunk = ChunkInfo.newBuilder()
73+
.setChunkName(blockID.getLocalID() + "_chunk")
74+
.setOffset(0)
75+
.setLen(data.size())
76+
.addMetadata(keyValue)
77+
.setChecksumData(checksum(data).getProtoBufMessage())
78+
.build();
79+
final PutSmallFileRequestProto putSmallFileRequest
80+
= PutSmallFileRequestProto.newBuilder()
81+
.setChunkInfo(chunk)
82+
.setBlock(putBlockRequest)
83+
.setData(data)
84+
.build();
85+
return ContainerCommandRequestProto.newBuilder()
86+
.setCmdType(Type.PutSmallFile)
87+
.setContainerID(blockID.getContainerID())
88+
.setDatanodeUuid(UUID.randomUUID().toString())
89+
.setPutSmallFile(putSmallFileRequest)
90+
.build();
91+
}
92+
93+
static ContainerCommandRequestProto newWriteChunk(
94+
BlockID blockID, ByteString data) {
95+
final ChunkInfo chunk = ChunkInfo.newBuilder()
96+
.setChunkName(blockID.getLocalID() + "_chunk_" + 1)
97+
.setOffset(0)
98+
.setLen(data.size())
99+
.setChecksumData(checksum(data).getProtoBufMessage())
100+
.build();
101+
102+
final WriteChunkRequestProto.Builder writeChunkRequest
103+
= WriteChunkRequestProto.newBuilder()
104+
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
105+
.setChunkData(chunk)
106+
.setData(data);
107+
return ContainerCommandRequestProto.newBuilder()
108+
.setCmdType(Type.WriteChunk)
109+
.setContainerID(blockID.getContainerID())
110+
.setDatanodeUuid(UUID.randomUUID().toString())
111+
.setWriteChunk(writeChunkRequest)
112+
.build();
113+
}
114+
115+
@Test
116+
public void testPutSmallFile() throws Exception {
117+
runTest(TestContainerCommandRequestMessage::newPutSmallFile);
118+
}
119+
120+
@Test
121+
public void testWriteChunk() throws Exception {
122+
runTest(TestContainerCommandRequestMessage::newWriteChunk);
123+
}
124+
125+
static void runTest(
126+
BiFunction<BlockID, ByteString, ContainerCommandRequestProto> method)
127+
throws Exception {
128+
for(int i = 0; i < 2; i++) {
129+
runTest(i, method);
130+
}
131+
for(int i = 2; i < 1 << 10;) {
132+
runTest(i + 1 + RANDOM.nextInt(i - 1), method);
133+
i <<= 1;
134+
runTest(i, method);
135+
}
136+
}
137+
138+
static void runTest(int length,
139+
BiFunction<BlockID, ByteString, ContainerCommandRequestProto> method)
140+
throws Exception {
141+
System.out.println("length=" + length);
142+
final BlockID blockID = new BlockID(RANDOM.nextLong(), RANDOM.nextLong());
143+
final ByteString data = newData(length, RANDOM);
144+
145+
final ContainerCommandRequestProto original = method.apply(blockID, data);
146+
final ContainerCommandRequestMessage message
147+
= ContainerCommandRequestMessage.toMessage(original, null);
148+
final ContainerCommandRequestProto computed
149+
= ContainerCommandRequestMessage.toProto(message.getContent(), null);
150+
Assert.assertEquals(original, computed);
151+
}
152+
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.hdds.HddsUtils;
2828
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2929

30+
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
3031
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
3132
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
3233
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -313,7 +314,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
313314
throws IOException {
314315
long startTime = Time.monotonicNowNanos();
315316
final ContainerCommandRequestProto proto =
316-
getContainerCommandRequestProto(request.getMessage().getContent());
317+
message2ContainerCommandRequestProto(request.getMessage());
317318
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
318319
try {
319320
dispatcher.validateContainerCommand(proto);
@@ -363,7 +364,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
363364
.setStateMachine(this)
364365
.setServerRole(RaftPeerRole.LEADER)
365366
.setStateMachineContext(startTime)
366-
.setLogData(request.getMessage().getContent())
367+
.setLogData(proto.toByteString())
367368
.build();
368369
}
369370

@@ -383,6 +384,11 @@ private ContainerCommandRequestProto getContainerCommandRequestProto(
383384
.setPipelineID(gid.getUuid().toString()).build();
384385
}
385386

387+
private ContainerCommandRequestProto message2ContainerCommandRequestProto(
388+
Message message) throws InvalidProtocolBufferException {
389+
return ContainerCommandRequestMessage.toProto(message.getContent(), gid);
390+
}
391+
386392
private ContainerCommandResponseProto dispatchCommand(
387393
ContainerCommandRequestProto requestProto, DispatcherContext context) {
388394
LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
@@ -530,7 +536,7 @@ public CompletableFuture<Message> query(Message request) {
530536
try {
531537
metrics.incNumQueryStateMachineOps();
532538
final ContainerCommandRequestProto requestProto =
533-
getContainerCommandRequestProto(request.getContent());
539+
message2ContainerCommandRequestProto(request);
534540
return CompletableFuture
535541
.completedFuture(runCommand(requestProto, null)::toByteString);
536542
} catch (IOException e) {

0 commit comments

Comments
 (0)