Skip to content

Commit 6419900

Browse files
committed
HDFS-9040. Erasure coding: coordinate data streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su.
1 parent c09dc25 commit 6419900

File tree

18 files changed

+1068
-938
lines changed

18 files changed

+1068
-938
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
@InterfaceStability.Evolving
3939
public class DatanodeID implements Comparable<DatanodeID> {
4040
public static final DatanodeID[] EMPTY_ARRAY = {};
41+
public static final DatanodeID EMPTY_DATANODE_ID = new DatanodeID("null",
42+
"null", "null", 0, 0, 0, 0);
4143

4244
private String ipAddr; // IP address
4345
private String hostName; // hostname claimed by datanode

hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,3 +450,6 @@
450450

451451
HDFS-8882. Erasure Coding: Use datablocks, parityblocks and cell size from
452452
ErasureCodingPolicy (Vinayakumar B via zhz)
453+
454+
HDFS-9040. Erasure coding: coordinate data streamers in
455+
DFSStripedOutputStream. (jing9 and Walter Su)

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
5252
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
5353
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
54+
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
5455
import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
5556
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
5657
import org.apache.hadoop.hdfs.util.ByteArrayManager;
@@ -212,14 +213,17 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
212213
/** Construct a new output stream for creating a file. */
213214
protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
214215
EnumSet<CreateFlag> flag, Progressable progress,
215-
DataChecksum checksum, String[] favoredNodes) throws IOException {
216+
DataChecksum checksum, String[] favoredNodes, boolean createStreamer)
217+
throws IOException {
216218
this(dfsClient, src, progress, stat, checksum);
217219
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
218220

219221
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
220222

221-
streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
222-
cachingStrategy, byteArrayManager, favoredNodes);
223+
if (createStreamer) {
224+
streamer = new DataStreamer(stat, null, dfsClient, src, progress,
225+
checksum, cachingStrategy, byteArrayManager, favoredNodes);
226+
}
223227
}
224228

225229
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
@@ -276,7 +280,7 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
276280
flag, progress, checksum, favoredNodes);
277281
} else {
278282
out = new DFSOutputStream(dfsClient, src, stat,
279-
flag, progress, checksum, favoredNodes);
283+
flag, progress, checksum, favoredNodes, true);
280284
}
281285
out.start();
282286
return out;
@@ -476,7 +480,7 @@ protected void adjustChunkBoundary() {
476480
*
477481
* @throws IOException
478482
*/
479-
protected void endBlock() throws IOException {
483+
void endBlock() throws IOException {
480484
if (getStreamer().getBytesCurBlock() == blockSize) {
481485
setCurrentPacketToEmpty();
482486
enqueueCurrentPacket();
@@ -921,4 +925,52 @@ protected DataStreamer getStreamer() {
921925
public String toString() {
922926
return getClass().getSimpleName() + ":" + streamer;
923927
}
928+
929+
static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient,
930+
String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes)
931+
throws IOException {
932+
final DfsClientConf conf = dfsClient.getConf();
933+
int retries = conf.getNumBlockWriteLocateFollowingRetry();
934+
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
935+
long localstart = Time.monotonicNow();
936+
while (true) {
937+
try {
938+
return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
939+
excludedNodes, fileId, favoredNodes);
940+
} catch (RemoteException e) {
941+
IOException ue = e.unwrapRemoteException(FileNotFoundException.class,
942+
AccessControlException.class,
943+
NSQuotaExceededException.class,
944+
DSQuotaExceededException.class,
945+
QuotaByStorageTypeExceededException.class,
946+
UnresolvedPathException.class);
947+
if (ue != e) {
948+
throw ue; // no need to retry these exceptions
949+
}
950+
if (NotReplicatedYetException.class.getName().equals(e.getClassName())) {
951+
if (retries == 0) {
952+
throw e;
953+
} else {
954+
--retries;
955+
LOG.info("Exception while adding a block", e);
956+
long elapsed = Time.monotonicNow() - localstart;
957+
if (elapsed > 5000) {
958+
LOG.info("Waiting for replication for " + (elapsed / 1000)
959+
+ " seconds");
960+
}
961+
try {
962+
LOG.warn("NotReplicatedYetException sleeping " + src
963+
+ " retries left " + retries);
964+
Thread.sleep(sleeptime);
965+
sleeptime *= 2;
966+
} catch (InterruptedException ie) {
967+
LOG.warn("Caught exception", ie);
968+
}
969+
}
970+
} else {
971+
throw e;
972+
}
973+
}
974+
}
975+
}
924976
}

0 commit comments

Comments
 (0)