Skip to content

Commit a0a6b4a

Browse files
bshashikantRogPodge
authored andcommitted
HDDS-1753. Datanode unable to find chunk while replication data using ratis. (apache#1318)
1 parent f574468 commit a0a6b4a

File tree

17 files changed

+564
-111
lines changed

17 files changed

+564
-111
lines changed

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,4 +242,10 @@ static RetryPolicy createRetryPolicy(Configuration conf) {
242242
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
243243
return retryPolicy;
244244
}
245+
246+
static Long getMinReplicatedIndex(
247+
Collection<RaftProtos.CommitInfoProto> commitInfos) {
248+
return commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex)
249+
.min(Long::compareTo).orElse(null);
250+
}
245251
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,4 +544,9 @@ public void computeAndSetChecksum(Yaml yaml) throws IOException {
544544
* @return Protocol Buffer Message
545545
*/
546546
public abstract ContainerProtos.ContainerDataProto getProtoBufMessage();
547+
548+
/**
549+
* Returns the blockCommitSequenceId.
550+
*/
551+
public abstract long getBlockCommitSequenceId();
547552
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,15 @@
2727
import org.apache.hadoop.hdds.scm.container.common.helpers
2828
.StorageContainerException;
2929
import org.apache.hadoop.ozone.container.common.interfaces.Container;
30-
import org.apache.hadoop.ozone.container.common
31-
.interfaces.ContainerDeletionChoosingPolicy;
3230
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
3331
import org.slf4j.Logger;
3432
import org.slf4j.LoggerFactory;
3533

3634
import java.io.IOException;
3735
import java.util.Iterator;
38-
import java.util.List;
3936
import java.util.Set;
37+
import java.util.List;
38+
import java.util.Collections;
4039
import java.util.Map;
4140
import java.util.concurrent.ConcurrentNavigableMap;
4241
import java.util.concurrent.ConcurrentSkipListMap;
@@ -165,6 +164,10 @@ public Map<Long, Container> getContainerMapCopy() {
165164
return ImmutableMap.copyOf(containerMap);
166165
}
167166

167+
public Map<Long, Container> getContainerMap() {
168+
return Collections.unmodifiableMap(containerMap);
169+
}
170+
168171
/**
169172
* A simple interface for container Iterations.
170173
* <p>
@@ -232,18 +235,6 @@ public ContainerReportsProto getContainerReport() throws IOException {
232235
return crBuilder.build();
233236
}
234237

235-
public List<ContainerData> chooseContainerForBlockDeletion(int count,
236-
ContainerDeletionChoosingPolicy deletionPolicy)
237-
throws StorageContainerException {
238-
Map<Long, ContainerData> containerDataMap = containerMap.entrySet().stream()
239-
.filter(e -> deletionPolicy.isValidContainerType(
240-
e.getValue().getContainerType()))
241-
.collect(Collectors.toMap(Map.Entry::getKey,
242-
e -> e.getValue().getContainerData()));
243-
return deletionPolicy
244-
.chooseContainerForBlockDeletion(count, containerDataMap);
245-
}
246-
247238
public Set<Long> getMissingContainerSet() {
248239
return missingContainerSet;
249240
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
1919

20+
import com.google.common.annotations.VisibleForTesting;
2021
import com.google.common.base.Preconditions;
2122
import org.apache.hadoop.hdds.protocol.proto
2223
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
@@ -81,6 +82,11 @@ public CommandHandler getCloseContainerHandler() {
8182
return handlerMap.get(Type.closeContainerCommand);
8283
}
8384

85+
@VisibleForTesting
86+
public CommandHandler getDeleteBlocksCommandHandler() {
87+
return handlerMap.get(Type.deleteBlocksCommand);
88+
}
89+
8490
/**
8591
* Dispatch the command to the correct handler.
8692
*

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,11 @@ public long getNumReadStateMachineMissCount() {
171171
return numReadStateMachineMissCount.value();
172172
}
173173

174+
@VisibleForTesting
175+
public long getNumReadStateMachineOps() {
176+
return numReadStateMachineOps.value();
177+
}
178+
174179
@VisibleForTesting
175180
public long getNumBytesWrittenCount() {
176181
return numBytesWrittenCount.value();

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import com.google.common.base.Preconditions;
2223
import com.google.common.cache.Cache;
2324
import com.google.common.cache.CacheBuilder;
@@ -518,7 +519,8 @@ public CompletableFuture<Message> query(Message request) {
518519
}
519520

520521
private ByteString readStateMachineData(
521-
ContainerCommandRequestProto requestProto, long term, long index) {
522+
ContainerCommandRequestProto requestProto, long term, long index)
523+
throws IOException {
522524
// the stateMachine data is not present in the stateMachine cache,
523525
// increment the stateMachine cache miss count
524526
metrics.incNumReadStateMachineMissCount();
@@ -532,18 +534,24 @@ private ByteString readStateMachineData(
532534
.setChunkData(chunkInfo);
533535
ContainerCommandRequestProto dataContainerCommandProto =
534536
ContainerCommandRequestProto.newBuilder(requestProto)
535-
.setCmdType(Type.ReadChunk)
536-
.setReadChunk(readChunkRequestProto)
537+
.setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto)
537538
.build();
538539
DispatcherContext context =
539-
new DispatcherContext.Builder()
540-
.setTerm(term)
541-
.setLogIndex(index)
542-
.setReadFromTmpFile(true)
543-
.build();
540+
new DispatcherContext.Builder().setTerm(term).setLogIndex(index)
541+
.setReadFromTmpFile(true).build();
544542
// read the chunk
545543
ContainerCommandResponseProto response =
546544
dispatchCommand(dataContainerCommandProto, context);
545+
if (response.getResult() != ContainerProtos.Result.SUCCESS) {
546+
StorageContainerException sce =
547+
new StorageContainerException(response.getMessage(),
548+
response.getResult());
549+
LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : "
550+
+ "{} Container Result: {}", gid, response.getCmdType(), index,
551+
response.getMessage(), response.getResult());
552+
throw sce;
553+
}
554+
547555
ReadChunkResponseProto responseProto = response.getReadChunk();
548556

549557
ByteString data = responseProto.getData();
@@ -746,7 +754,8 @@ private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
746754
return future;
747755
}
748756

749-
private void evictStateMachineCache() {
757+
@VisibleForTesting
758+
public void evictStateMachineCache() {
750759
stateMachineDataCache.invalidateAll();
751760
stateMachineDataCache.cleanUp();
752761
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,11 @@
2222
import org.apache.hadoop.conf.Configuration;
2323
import org.apache.hadoop.conf.StorageUnit;
2424
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
25-
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
26-
.ContainerCommandRequestProto;
25+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
2726
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
28-
import org.apache.hadoop.hdds.protocol.proto
29-
.StorageContainerDatanodeProtocolProtos.PipelineReport;
30-
import org.apache.hadoop.hdds.protocol.proto
31-
.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
32-
import org.apache.hadoop.hdds.protocol.proto
33-
.StorageContainerDatanodeProtocolProtos.PipelineAction;
27+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
28+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
29+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
3430
import org.apache.hadoop.hdds.scm.HddsServerUtil;
3531
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
3632
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
@@ -50,14 +46,7 @@
5046
import org.apache.ratis.grpc.GrpcFactory;
5147
import org.apache.ratis.grpc.GrpcTlsConfig;
5248
import org.apache.ratis.netty.NettyConfigKeys;
53-
import org.apache.ratis.protocol.RaftClientRequest;
54-
import org.apache.ratis.protocol.Message;
55-
import org.apache.ratis.protocol.RaftClientReply;
56-
import org.apache.ratis.protocol.ClientId;
57-
import org.apache.ratis.protocol.NotLeaderException;
58-
import org.apache.ratis.protocol.StateMachineException;
59-
import org.apache.ratis.protocol.RaftPeerId;
60-
import org.apache.ratis.protocol.RaftGroupId;
49+
import org.apache.ratis.protocol.*;
6150
import org.apache.ratis.rpc.RpcType;
6251
import org.apache.ratis.rpc.SupportedRpcType;
6352
import org.apache.ratis.server.RaftServer;
@@ -74,11 +63,11 @@
7463

7564
import java.io.File;
7665
import java.io.IOException;
77-
import java.util.ArrayList;
78-
import java.util.Collections;
7966
import java.util.List;
8067
import java.util.Objects;
68+
import java.util.Collections;
8169
import java.util.UUID;
70+
import java.util.ArrayList;
8271
import java.util.concurrent.ArrayBlockingQueue;
8372
import java.util.concurrent.ThreadPoolExecutor;
8473
import java.util.concurrent.TimeUnit;
@@ -139,10 +128,10 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
139128
TimeUnit.MILLISECONDS);
140129
this.dispatcher = dispatcher;
141130

142-
RaftServer.Builder builder = RaftServer.newBuilder()
143-
.setServerId(RatisHelper.toRaftPeerId(dd))
144-
.setProperties(serverProperties)
145-
.setStateMachineRegistry(this::getStateMachine);
131+
RaftServer.Builder builder =
132+
RaftServer.newBuilder().setServerId(RatisHelper.toRaftPeerId(dd))
133+
.setProperties(serverProperties)
134+
.setStateMachineRegistry(this::getStateMachine);
146135
if (tlsConfig != null) {
147136
builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig));
148137
}
@@ -507,6 +496,13 @@ private RaftClientRequest createRaftClientRequest(
507496
null);
508497
}
509498

499+
private GroupInfoRequest createGroupInfoRequest(
500+
HddsProtos.PipelineID pipelineID) {
501+
return new GroupInfoRequest(clientId, server.getId(),
502+
RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()),
503+
nextCallId());
504+
}
505+
510506
private void handlePipelineFailure(RaftGroupId groupId,
511507
RoleInfoProto roleInfoProto) {
512508
String msg;
@@ -654,4 +650,12 @@ public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) {
654650
triggerPipelineClose(groupId, msg,
655651
ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true);
656652
}
653+
654+
public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {
655+
Long minIndex;
656+
GroupInfoReply reply = getServer()
657+
.getGroupInfo(createGroupInfoRequest(pipelineID.getProtobuf()));
658+
minIndex = RatisHelper.getMinReplicatedIndex(reply.getCommitInfos());
659+
return minIndex == null ? -1 : minIndex.longValue();
660+
}
657661
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.LinkedList;
2626
import java.util.List;
2727
import java.util.Map;
28-
import java.util.concurrent.TimeUnit;
2928
import java.util.concurrent.locks.ReentrantLock;
3029

3130
import org.apache.hadoop.conf.Configuration;
@@ -76,8 +75,6 @@
7675
import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
7776
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
7877
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
79-
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
80-
.BlockDeletingService;
8178
import org.apache.hadoop.util.AutoCloseableLock;
8279
import org.apache.hadoop.util.ReflectionUtils;
8380

@@ -86,15 +83,8 @@
8683
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
8784
import static org.apache.hadoop.hdds.HddsConfigKeys
8885
.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
89-
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
90-
import static org.apache.hadoop.ozone.OzoneConfigKeys
91-
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
92-
import static org.apache.hadoop.ozone.OzoneConfigKeys
93-
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
94-
import static org.apache.hadoop.ozone.OzoneConfigKeys
95-
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
96-
import static org.apache.hadoop.ozone.OzoneConfigKeys
97-
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
86+
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
87+
Result.*;
9888
import org.slf4j.Logger;
9989
import org.slf4j.LoggerFactory;
10090

@@ -109,7 +99,6 @@ public class KeyValueHandler extends Handler {
10999
private final ContainerType containerType;
110100
private final BlockManager blockManager;
111101
private final ChunkManager chunkManager;
112-
private final BlockDeletingService blockDeletingService;
113102
private final VolumeChoosingPolicy volumeChoosingPolicy;
114103
private final long maxContainerSize;
115104

@@ -126,18 +115,6 @@ public KeyValueHandler(Configuration config, StateContext context,
126115
conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY,
127116
OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT);
128117
chunkManager = new ChunkManagerImpl(doSyncWrite);
129-
long svcInterval = config
130-
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
131-
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
132-
TimeUnit.MILLISECONDS);
133-
long serviceTimeout = config
134-
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
135-
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
136-
TimeUnit.MILLISECONDS);
137-
this.blockDeletingService =
138-
new BlockDeletingService(containerSet, svcInterval, serviceTimeout,
139-
TimeUnit.MILLISECONDS, config);
140-
blockDeletingService.start();
141118
volumeChoosingPolicy = ReflectionUtils.newInstance(conf.getClass(
142119
HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
143120
.class, VolumeChoosingPolicy.class), conf);
@@ -160,7 +137,6 @@ public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() {
160137

161138
@Override
162139
public void stop() {
163-
blockDeletingService.shutdown();
164140
}
165141

166142
@Override

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
207207

208208
// In case the chunk file does not exist but tmp chunk file exist,
209209
// read from tmp chunk file if readFromTmpFile is set to true
210-
if (!chunkFile.exists() && dispatcherContext.isReadFromTmpFile()) {
210+
if (!chunkFile.exists() && dispatcherContext != null
211+
&& dispatcherContext.isReadFromTmpFile()) {
211212
chunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
212213
}
213214
data = ChunkUtils.readData(chunkFile, info, volumeIOStats);

0 commit comments

Comments
 (0)