Skip to content

Commit c57c308

Browse files
committed
apache#6. datanode 写数据
1 parent 4e9f927 commit c57c308

File tree

11 files changed

+86
-3
lines changed

11 files changed

+86
-3
lines changed

hadoop-common-project/hadoop-common/src/main/conf/core-site.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<configuration>
2020
<property>
2121
<name>fs.defaultFS</name>
22-
<value>hdfs://localhost:9000</value>
22+
<value>hdfs://localhost/</value>
2323
</property>
2424
</configuration>
2525

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ protected synchronized int flushBuffer(boolean keep,
160160
int partialLen = bufLen % sum.getBytesPerChecksum();
161161
int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
162162
if (lenToFlush != 0) {
163+
// TODO 核心的代码
164+
// TODO HDFS文件 -》 block文件块 (128M) -> packet(64K) = 127chunk -> chunk (512 bytes) + chunk sum (4 bytes)
163165
writeChecksumChunks(buf, 0, lenToFlush);
164166
if (!flushPartial || keep) {
165167
count = partialLen;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1701,6 +1701,8 @@ public DFSOutputStream create(String src,
17011701
src, masked, flag, createParent, replication, blockSize, progress,
17021702
buffersize, dfsClientConf.createChecksum(checksumOpt),
17031703
getFavoredNodesStr(favoredNodes));
1704+
1705+
// 开启契约管理
17041706
beginFileLease(result.getFileId(), result);
17051707
return result;
17061708
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,14 @@ private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
224224
// it. When all the packets for a block are sent out and acks for each
225225
// if them are received, the DataStreamer closes the current block.
226226
//
227+
228+
/**
229+
* TODO
230+
* DataStreamer 的服务主要是用来处理接收数据管道里面的数据
231+
* 它会向NameNode申请新的block,namenode返回来的时候,会返回关于block的blockid和block对应的DataNode的信息
232+
* 启动了这个服务以后,它会接收packet流
233+
* 每个packet流都有序列号,当这个服务收到有i个完整的block的时候,就会返回响应并关闭当前的DataStreamer
234+
*/
227235
class DataStreamer extends Daemon {
228236
private volatile boolean streamerClosed = false;
229237
private ExtendedBlock block; // its length is number of bytes acked
@@ -446,6 +454,12 @@ public void run() {
446454
if(DFSClient.LOG.isDebugEnabled()) {
447455
DFSClient.LOG.debug("Allocating new block");
448456
}
457+
// TODO 步骤一:建立数据管道
458+
/**
459+
* nextBlockOutputStream 完成了两件事:
460+
* 1)向NameNode申请block
461+
* 2)建立数据管道
462+
*/
449463
setPipeline(nextBlockOutputStream());
450464
initDataStreaming();
451465
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
@@ -1223,6 +1237,7 @@ private LocatedBlock nextBlockOutputStream() throws IOException {
12231237
.keySet()
12241238
.toArray(new DatanodeInfo[0]);
12251239
block = oldBlock;
1240+
// TODO 获得block
12261241
lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
12271242
block = lb.getBlock();
12281243
block.setNumBytes(0);
@@ -1233,6 +1248,7 @@ private LocatedBlock nextBlockOutputStream() throws IOException {
12331248

12341249
//
12351250
// Connect to first DataNode in the list.
1251+
// TODO 链接到第一个datanode
12361252
//
12371253
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
12381254

@@ -1281,10 +1297,13 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
12811297
try {
12821298
assert null == s : "Previous socket unclosed";
12831299
assert null == blockReplyStream : "Previous blockReplyStream unclosed";
1300+
// 创建socket
12841301
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
12851302
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
1286-
1303+
1304+
// 输出流:把数据写到datanode上
12871305
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
1306+
// 输入流:读取响应的结果
12881307
InputStream unbufIn = NetUtils.getInputStream(s);
12891308
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
12901309
unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
@@ -1307,13 +1326,18 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
13071326

13081327
boolean[] targetPinnings = getPinnings(nodes, true);
13091328
// send the request
1329+
// TODO 发送写数据的请求
1330+
// socket请求
1331+
// datanode会启动一个DataXceiverServer服务接受socket请求
1332+
13101333
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
13111334
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
13121335
nodes.length, block.getNumBytes(), bytesSent, newGS,
13131336
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
13141337
(targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
13151338

13161339
// receive ack for connect
1340+
// TODO 等待datanode返回
13171341
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
13181342
PBHelper.vintPrefixed(blockReplyStream));
13191343
pipelineStatus = resp.getStatus();
@@ -1427,6 +1451,7 @@ private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws
14271451
long localstart = Time.monotonicNow();
14281452
while (true) {
14291453
try {
1454+
// TODO 获得写入的block
14301455
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
14311456
block, excludedNodes, fileId, favoredNodes);
14321457
} catch (RemoteException e) {
@@ -1598,6 +1623,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
15981623

15991624
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
16001625

1626+
// TODO 创建DataStreamer
16011627
streamer = new DataStreamer(stat, null);
16021628
if (favoredNodes != null && favoredNodes.length != 0) {
16031629
streamer.setFavoredNodes(favoredNodes);
@@ -1620,6 +1646,11 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
16201646
while (shouldRetry) {
16211647
shouldRetry = false;
16221648
try {
1649+
/**
1650+
* 上传文件:
1651+
* 1) 创建文件
1652+
* 2) 添加契约
1653+
*/
16231654
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
16241655
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
16251656
blockSize, SUPPORTED_CRYPTO_VERSIONS);
@@ -1651,6 +1682,7 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
16511682
}
16521683
}
16531684
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
1685+
// TODO 创建文件流
16541686
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
16551687
flag, progress, checksum, favoredNodes);
16561688
out.start();
@@ -1752,6 +1784,7 @@ private void waitAndQueueCurrentPacket() throws IOException {
17521784
firstWait = false;
17531785
}
17541786
try {
1787+
// 如果队列写满了,就等待
17551788
dataQueue.wait();
17561789
} catch (InterruptedException e) {
17571790
// If we get interrupted while waiting to queue data, we still need to get rid
@@ -1772,6 +1805,7 @@ private void waitAndQueueCurrentPacket() throws IOException {
17721805
}
17731806
}
17741807
checkClosed();
1808+
// TODO 往队列里面添加packet
17751809
queueCurrentPacket();
17761810
} catch (ClosedChannelException e) {
17771811
}
@@ -1807,6 +1841,7 @@ private synchronized void writeChunkImpl(byte[] b, int offset, int len,
18071841
}
18081842

18091843
if (currentPacket == null) {
1844+
// TODO 创建packet
18101845
currentPacket = createPacket(packetSize, chunksPerPacket,
18111846
bytesCurBlock, currentSeqno++, false);
18121847
if (DFSClient.LOG.isDebugEnabled()) {
@@ -1818,14 +1853,20 @@ private synchronized void writeChunkImpl(byte[] b, int offset, int len,
18181853
", bytesCurBlock=" + bytesCurBlock);
18191854
}
18201855
}
1821-
1856+
// TODO 往packet 里面写chunk的校验合 4bytes
18221857
currentPacket.writeChecksum(checksum, ckoff, cklen);
18231858
currentPacket.writeData(b, offset, len);
18241859
currentPacket.incNumChunks();
1860+
// 达到128M 就是一个block
18251861
bytesCurBlock += len;
18261862

18271863
// If packet is full, enqueue it for transmission
18281864
//
1865+
/**
1866+
* TODO 两个条件
1867+
* 1。 如果写满一个packet (127 chunk) = 一个packet
1868+
* 2。 如果写满了一个Block (128M) 2048packet
1869+
*/
18291870
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
18301871
bytesCurBlock == blockSize) {
18311872
if (DFSClient.LOG.isDebugEnabled()) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,11 @@ public FSDataOutputStream create(final Path f, final FsPermission permission,
445445
@Override
446446
public FSDataOutputStream doCall(final Path p)
447447
throws IOException, UnresolvedLinkException {
448+
/**
449+
* 1)往文件目录树里面添加了INodeFile
450+
* 2)添加契约管理
451+
* 3)启动了DataStreamer (写数据流程的关键服务)
452+
*/
448453
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
449454
cflags, replication, blockSize, progress, bufferSize,
450455
checksumOpt);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public void run() {
132132
Peer peer = null;
133133
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
134134
try {
135+
// TODO 接受socket 请求
135136
peer = peerServer.accept();
136137

137138
// Make sure the xceiver count is not exceeded

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2436,6 +2436,7 @@ private HdfsFileStatus startFileInt(final String srcArg,
24362436
try {
24372437
src = dir.resolvePath(pc, src, pathComponents);
24382438
final INodesInPath iip = dir.getINodesInPath4Write(src);
2439+
// TODO 重要代码
24392440
toRemoveBlocks = startFileInternal(
24402441
pc, iip, permissions, holder,
24412442
clientMachine, create, overwrite,
@@ -2561,6 +2562,8 @@ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
25612562
Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
25622563
.createAncestorDirectories(dir, iip, permissions);
25632564
if (parent != null) {
2565+
// TODO 往文件目录树里面添加INodeFile节点
2566+
// dir就是FSDirectory
25642567
iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
25652568
replication, blockSize, holder, clientMachine);
25662569
newNode = iip != null ? iip.getLastINode().asFile() : null;
@@ -2569,6 +2572,9 @@ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
25692572
if (newNode == null) {
25702573
throw new IOException("Unable to add " + src + " to namespace");
25712574
}
2575+
// TODO 添加契约
2576+
// 只有拥有契约的客户端才可以上传数据
2577+
// 契约会每隔一段时间更新
25722578
leaseManager.addLease(newNode.getFileUnderConstructionFeature()
25732579
.getClientName(), src);
25742580

@@ -3074,6 +3080,8 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
30743080
}
30753081

30763082
// choose targets for the new block to be allocated.
3083+
// TODO 选择存放block的datanode主机 (负载均衡)
3084+
// HDFS 机架感知
30773085
final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget4NewBlock(
30783086
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
30793087
storagePolicyID);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
619619
try {
620620
PermissionStatus perm = new PermissionStatus(getRemoteUser()
621621
.getShortUserName(), null, masked);
622+
// TODO 核心步骤
622623
status = namesystem.startFile(src, perm, clientName, clientMachine,
623624
flag.get(), createParent, replication, blockSize, supportedVersions,
624625
cacheEntry != null);
@@ -719,6 +720,12 @@ public LocatedBlock addBlock(String src, String clientName,
719720
}
720721
List<String> favoredNodesList = (favoredNodes == null) ? null
721722
: Arrays.asList(favoredNodes);
723+
724+
/**
725+
* 1) 选择三台dataNode副本机器
726+
* 2) 修改了目录树(内存)
727+
* 3)存储元数据信息(磁盘)
728+
*/
722729
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
723730
clientName, previous, excludedNodesSet, favoredNodesList);
724731
if (locatedBlock != null)

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,9 @@ public static void uploadImageFromStorage(URL fsName, Configuration conf,
240240

241241
/*
242242
* Uploads the imagefile using HTTP PUT method
243+
*
244+
* 10PB的数据
245+
* fsimage大概的大小: 1025 * 10 * 1024 = 4G
243246
*/
244247
private static void uploadImage(URL url, Configuration conf,
245248
NNStorage storage, NameNodeFile nnf, long txId, Canceler canceler)

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@
5757
* periodically waking up to take a checkpoint of the namespace.
5858
* When it takes a checkpoint, it saves it to its local
5959
* storage and then uploads it to the remote NameNode.
60+
*
61+
* 命名空间 = 元数据信息 = 目录树 =fsimage
62+
*
63+
* StandbyCheckpointer 是运行在standBynamenode上的一个线程
64+
* 会周期性的对命名空间做checkpoint的操作
65+
* 并把这份数据上传到active namenode上
6066
*/
6167
@InterfaceAudience.Private
6268
public class StandbyCheckpointer {
@@ -182,6 +188,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
182188
} else {
183189
imageType = NameNodeFile.IMAGE;
184190
}
191+
// TODO 保存fsimage
185192
img.saveNamespace(namesystem, imageType, canceler);
186193
txid = img.getStorage().getMostRecentCheckpointTxId();
187194
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
@@ -199,6 +206,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
199206
// Upload the saved checkpoint back to the active
200207
// Do this in a separate thread to avoid blocking transition to active
201208
// See HDFS-4816
209+
// TODO 上传fsimage到active namenode
202210
ExecutorService executor =
203211
Executors.newSingleThreadExecutor(uploadThreadFactory);
204212
Future<Void> upload = executor.submit(new Callable<Void>() {
@@ -320,7 +328,9 @@ private void doWork() {
320328
}
321329

322330
final long now = monotonicNow();
331+
// checkpoint条件一:是否超过1000000 条日志没有被checkpoint
323332
final long uncheckpointed = countUncheckpointedTxns();
333+
// checkpoint条件二:是否超过3600s没有进行checkpoint了
324334
final long secsSinceLast = (now - lastCheckpointTime) / 1000;
325335

326336
boolean needCheckpoint = needRollbackCheckpoint;

0 commit comments

Comments
 (0)