Skip to content

HDFS-16757. Add a new method copyBlockCrossNamespace to DataNode #6926

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: HDFS-2139
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -1956,6 +1957,30 @@ protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken);
}

protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, DatanodeInfo sourceDatanode,
ExtendedBlock targetBlk, Token<BlockTokenIdentifier> targetBlockToken,
DatanodeInfo targetDatanode) throws IOException {
IOStreamPair pair =
DFSUtilClient.connectToDN(sourceDatanode, getConf().getSocketTimeout(), conf, saslClient,
socketFactory, getConf().isConnectToDnViaHostname(), this, sourceBlockToken);
try {
new Sender((DataOutputStream) pair.out).copyBlockCrossNamespace(sourceBlk, sourceBlockToken,
targetBlk, targetBlockToken, targetDatanode);

pair.out.flush();

DataInputStream reply = new DataInputStream(pair.in);
BlockOpResponseProto proto =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(reply));
DataTransferProtoUtil.checkBlockOpStatus(proto,
"copyBlockCrossNamespace " + sourceBlk + " to " + targetBlk + " from " + sourceDatanode
+ " to " + targetDatanode);
} finally {
IOUtilsClient.cleanupWithLogger(LOG, pair.in, pair.out);
}
}

/**
* Infer the checksum type for a replica by sending an OP_READ_BLOCK
* for the first byte of that replica. This is used for compatibility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class DFSOutputStream extends FSOutputSummer
private FileEncryptionInfo fileEncryptionInfo;
private int writePacketSize;
private boolean leaseRecovered = false;
private ExtendedBlock userAssignmentLastBlock;

/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
Expand Down Expand Up @@ -949,6 +950,9 @@ protected void recoverLease(boolean recoverLeaseOnCloseException) {
void completeFile() throws IOException {
// get last block before destroying the streamer
ExtendedBlock lastBlock = getStreamer().getBlock();
if (lastBlock == null) {
lastBlock = getUserAssignmentLastBlock();
}
try (TraceScope ignored = dfsClient.getTracer()
.newScope("DFSOutputStream#completeFile")) {
completeFile(lastBlock);
Expand Down Expand Up @@ -1095,6 +1099,14 @@ ExtendedBlock getBlock() {
return getStreamer().getBlock();
}

public ExtendedBlock getUserAssignmentLastBlock() {
return userAssignmentLastBlock;
}

public void setUserAssignmentLastBlock(ExtendedBlock userAssignmentLastBlock) {
this.userAssignmentLastBlock = userAssignmentLastBlock;
}

@VisibleForTesting
public long getFileId() {
return fileId;
Expand Down Expand Up @@ -1199,4 +1211,16 @@ private static long calculateDelayForNextRetry(long previousDelay,
long maxDelay) {
return Math.min(previousDelay * 2, maxDelay);
}

public DFSClient getDfsClient() {
return dfsClient;
}

protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, DatanodeInfo sourceDatanode,
ExtendedBlock targetBlk, Token<BlockTokenIdentifier> targetBlockToken,
DatanodeInfo targetDatanode) throws IOException {
dfsClient.copyBlockCrossNamespace(sourceBlk, sourceBlockToken, sourceDatanode, targetBlk,
targetBlockToken, targetDatanode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,9 @@ protected synchronized void closeImpl() throws IOException {

try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) {
if (currentBlockGroup == null) {
currentBlockGroup = getUserAssignmentLastBlock();
}
completeFile(currentBlockGroup);
}
logCorruptBlocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,23 @@ public FSDataOutputStream next(final FileSystem fs, final Path p)
* specifies an explicit storage policy for this file, overriding the
* inherited policy.
*
* @param f the file name to open
* @param permission file permission
* @param flag {@link CreateFlag}s to use for this stream
* @param bufferSize the size of the buffer to be used
* @param replication required block replication for the file
* @param blockSize block size
* @param progress the progress reporter
* @param checksumOpt checksum parameter. If null, the values
* found in conf will be used
* @param favoredNodes favored nodes address list
* @param ecPolicyName file ecPolicyName
* @param storagePolicy file storage policy
* @throws IOException IO failure
* @see #setPermission(Path, FsPermission)
* @return output stream
*/
private HdfsDataOutputStream create(final Path f,
public HdfsDataOutputStream create(final Path f,
final FsPermission permission, final EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,19 @@ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
Token<BlockTokenIdentifier> blockToken,
long requestedNumBytes,
BlockChecksumOptions blockChecksumOptions) throws IOException;

/**
* Copy a block cross Namespace.
* It is used for fastcopy.
*
* @param sourceBlk the block being copied.
* @param sourceBlockToken security token for accessing sourceBlk.
* @param targetBlk the block to be writted.
* @param targetBlockToken security token for accessing targetBlk.
* @param targetDatanode the target datnode which sourceBlk will copy to as targetBlk.
* @throws IOException if an I/O error occurred
*/
void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, ExtendedBlock targetBlk,
Token<BlockTokenIdentifier> targetBlockToken, DatanodeInfo targetDatanode) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum Op {
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
BLOCK_GROUP_CHECKSUM((byte)90),
COPY_BLOCK_CROSSNAMESPACE((byte)91),
CUSTOM((byte)127);

/** The code for this operation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
Expand Down Expand Up @@ -308,4 +309,18 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,

send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
}

@Override
public void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, ExtendedBlock targetBlk,
Token<BlockTokenIdentifier> targetBlockToken, DatanodeInfo targetDatanode)
throws IOException {
OpCopyBlockCrossNamespaceProto proto = OpCopyBlockCrossNamespaceProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(sourceBlk, sourceBlockToken))
.setTargetBlock(PBHelperClient.convert(targetBlk))
.setTargetToken(PBHelperClient.convert(targetBlockToken))
.setTargetDatanode(PBHelperClient.convert(targetDatanode)).build();

send(out, Op.COPY_BLOCK_CROSSNAMESPACE, proto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,10 @@ message OpBlockChecksumResponseProto {
message OpCustomProto {
required string customId = 1;
}

message OpCopyBlockCrossNamespaceProto {
required BaseHeaderProto header = 1;
required ExtendedBlockProto targetBlock = 2;
required hadoop.common.TokenProto targetToken = 3;
required DatanodeInfoProto targetDatanode = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.ec.reconstruct.write.bandwidthPerSec";
public static final long DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT =
0; // A value of zero indicates no limit
public static final String DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY =
"dfs.datanode.copy.block.cross.namespace.socket-timeout.ms";
public static final int DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT =
5 * 60 * 1000;

@Deprecated
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY =
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
Expand Down Expand Up @@ -133,6 +134,9 @@ protected final void processOp(Op op) throws IOException {
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
case COPY_BLOCK_CROSSNAMESPACE:
opCopyBlockCrossNamespace(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
Expand Down Expand Up @@ -339,4 +343,21 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
}
}
}

private void opCopyBlockCrossNamespace(DataInputStream dis) throws IOException {
OpCopyBlockCrossNamespaceProto proto =
OpCopyBlockCrossNamespaceProto.parseFrom(vintPrefixed(dis));
TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName());
try {
copyBlockCrossNamespace(PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelperClient.convert(proto.getHeader().getToken()),
PBHelperClient.convert(proto.getTargetBlock()),
PBHelperClient.convert(proto.getTargetToken()),
PBHelperClient.convert(proto.getTargetDatanode()));
} finally {
if (traceScope != null) {
traceScope.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
Expand Down Expand Up @@ -89,6 +91,7 @@ public class DNConf {
final int socketWriteTimeout;
final int socketKeepaliveTimeout;
final int ecChecksumSocketTimeout;
private final int copyBlockCrossNamespaceSocketTimeout;
private final int transferSocketSendBufferSize;
private final int transferSocketRecvBufferSize;
private final boolean tcpNoDelay;
Expand Down Expand Up @@ -153,6 +156,9 @@ public DNConf(final Configurable dn) {
ecChecksumSocketTimeout = getConf().getInt(
DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY,
DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT);
copyBlockCrossNamespaceSocketTimeout = getConf().getInt(
DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY,
DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT);
this.transferSocketSendBufferSize = getConf().getInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
Expand Down Expand Up @@ -390,6 +396,15 @@ public int getEcChecksumSocketTimeout() {
return ecChecksumSocketTimeout;
}

/**
* Returns socket timeout for copyBlockCrossNamespace.
*
* @return int socket timeout
*/
public int getCopyBlockCrossNamespaceSocketTimeout() {
return copyBlockCrossNamespaceSocketTimeout;
}

/**
* Returns the SaslPropertiesResolver configured for use with
* DataTransferProtocol, or null if not configured.
Expand Down
Loading