Skip to content

Commit d911d76

Browse files
committed
HDDS-1898. GrpcReplicationService#download cannot replicate the container.
1 parent 0b9704f commit d911d76

File tree

7 files changed

+68
-29
lines changed

7 files changed

+68
-29
lines changed

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
package org.apache.hadoop.ozone.container.common.interfaces;
2020

2121

22-
import java.io.FileInputStream;
2322
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.io.OutputStream;
2425

2526
import org.apache.hadoop.conf.Configuration;
2627
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -109,17 +110,26 @@ public abstract ContainerCommandResponseProto handle(
109110
DispatcherContext dispatcherContext);
110111

111112
/**
112-
* Import container data from a raw input stream.
113+
* Imports container from a raw input stream.
113114
*/
114115
public abstract Container importContainer(
115116
long containerID,
116117
long maxSize,
117118
String originPipelineId,
118119
String originNodeId,
119-
FileInputStream rawContainerStream,
120+
InputStream rawContainerStream,
120121
TarContainerPacker packer)
121122
throws IOException;
122123

124+
/**
125+
* Exports container to the output stream.
126+
*/
127+
public abstract void exportContainer(
128+
final Container container,
129+
final OutputStream outputStream,
130+
final TarContainerPacker packer)
131+
throws IOException;
132+
123133
/**
124134
* Stop the Handler.
125135
*/

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,12 @@ public void handle(SCMCommand command, OzoneContainer container,
127127
case KeyValueContainer:
128128
KeyValueContainerData containerData = (KeyValueContainerData)
129129
cont.getContainerData();
130-
deleteKeyValueContainerBlocks(containerData, entry);
130+
cont.writeLock();
131+
try {
132+
deleteKeyValueContainerBlocks(containerData, entry);
133+
} finally {
134+
cont.writeUnlock();
135+
}
131136
txResultBuilder.setContainerID(containerId)
132137
.setSuccess(true);
133138
break;

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,9 @@ public void close() throws StorageContainerException {
328328
} finally {
329329
writeUnlock();
330330
}
331+
LOG.info("Container {} is closed with bcsId {}.",
332+
containerData.getContainerID(),
333+
containerData.getBlockCommitSequenceId());
331334
}
332335

333336
/**
@@ -359,13 +362,10 @@ private void updateContainerData(Runnable update)
359362
}
360363
}
361364

362-
void compactDB() throws StorageContainerException {
365+
private void compactDB() throws StorageContainerException {
363366
try {
364367
try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
365368
db.getStore().compactDB();
366-
LOG.info("Container {} is closed with bcsId {}.",
367-
containerData.getContainerID(),
368-
containerData.getBlockCommitSequenceId());
369369
}
370370
} catch (StorageContainerException ex) {
371371
throw ex;
@@ -522,6 +522,7 @@ public void exportContainerData(OutputStream destination,
522522
"Only closed containers could be exported: ContainerId="
523523
+ getContainerData().getContainerID());
524524
}
525+
compactDB();
525526
packer.pack(this, destination);
526527
}
527528

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818

1919
package org.apache.hadoop.ozone.container.keyvalue;
2020

21-
import java.io.FileInputStream;
2221
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.OutputStream;
2324
import java.nio.ByteBuffer;
2425
import java.util.HashMap;
2526
import java.util.LinkedList;
@@ -841,13 +842,14 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
841842
throw new StorageContainerException(msg, result);
842843
}
843844

844-
public Container importContainer(long containerID, long maxSize,
845-
String originPipelineId,
846-
String originNodeId,
847-
FileInputStream rawContainerStream,
848-
TarContainerPacker packer)
845+
@Override
846+
public Container importContainer(final long containerID,
847+
final long maxSize, final String originPipelineId,
848+
final String originNodeId, final InputStream rawContainerStream,
849+
final TarContainerPacker packer)
849850
throws IOException {
850851

852+
// TODO: Add layout version!
851853
KeyValueContainerData containerData =
852854
new KeyValueContainerData(containerID,
853855
maxSize, originPipelineId, originNodeId);
@@ -862,6 +864,20 @@ public Container importContainer(long containerID, long maxSize,
862864

863865
}
864866

867+
@Override
868+
public void exportContainer(final Container container,
869+
final OutputStream outputStream,
870+
final TarContainerPacker packer)
871+
throws IOException{
872+
container.readLock();
873+
try {
874+
final KeyValueContainer kvc = (KeyValueContainer) container;
875+
kvc.exportContainerData(outputStream, packer);
876+
} finally {
877+
container.readUnlock();
878+
}
879+
}
880+
865881
@Override
866882
public void markContainerForClose(Container container)
867883
throws IOException {

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@
2929
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
3030
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
3131

32-
import java.io.FileInputStream;
3332
import java.io.IOException;
33+
import java.io.InputStream;
34+
import java.io.OutputStream;
3435
import java.util.Iterator;
3536
import java.util.Map;
3637

@@ -120,13 +121,20 @@ public void closeContainer(final long containerId) throws IOException {
120121

121122
public Container importContainer(final ContainerType type,
122123
final long containerId, final long maxSize, final String originPipelineId,
123-
final String originNodeId, final FileInputStream rawContainerStream,
124+
final String originNodeId, final InputStream rawContainerStream,
124125
final TarContainerPacker packer)
125126
throws IOException {
126127
return handlers.get(type).importContainer(containerId, maxSize,
127128
originPipelineId, originNodeId, rawContainerStream, packer);
128129
}
129130

131+
public void exportContainer(final ContainerType type,
132+
final long containerId, final OutputStream outputStream,
133+
final TarContainerPacker packer) throws IOException {
134+
handlers.get(type).exportContainer(
135+
containerSet.getContainer(containerId), outputStream, packer);
136+
}
137+
130138
/**
131139
* Deletes a container given its Id.
132140
* @param containerId Id of the container to be deleted

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.io.OutputStream;
2222

2323
import org.apache.hadoop.ozone.container.common.interfaces.Container;
24-
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
2524
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
2625

2726
import com.google.common.base.Preconditions;
@@ -41,7 +40,7 @@ public class OnDemandContainerReplicationSource
4140

4241
private ContainerController controller;
4342

44-
private ContainerPacker packer = new TarContainerPacker();
43+
private TarContainerPacker packer = new TarContainerPacker();
4544

4645
public OnDemandContainerReplicationSource(
4746
ContainerController controller) {
@@ -59,18 +58,11 @@ public void copyData(long containerId, OutputStream destination)
5958

6059
Container container = controller.getContainer(containerId);
6160

62-
Preconditions
63-
.checkNotNull(container, "Container is not found " + containerId);
61+
Preconditions.checkNotNull(
62+
container, "Container is not found " + containerId);
6463

65-
switch (container.getContainerType()) {
66-
case KeyValueContainer:
67-
packer.pack(container,
68-
destination);
69-
break;
70-
default:
71-
LOG.warn("Container type " + container.getContainerType()
72-
+ " is not replicable as no compression algorithm for that.");
73-
}
64+
controller.exportContainer(
65+
container.getContainerType(), containerId, destination, packer);
7466

7567
}
7668
}

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
4747
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
4848
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
49+
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
4950
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
5051
import org.apache.hadoop.test.GenericTestUtils;
5152

@@ -119,6 +120,12 @@ public void testContainerReplication() throws Exception {
119120
chooseDatanodeWithoutContainer(sourcePipelines,
120121
cluster.getHddsDatanodes());
121122

123+
// Close the container
124+
cluster.getStorageContainerManager().getScmNodeManager()
125+
.addDatanodeCommand(
126+
sourceDatanodes.get(0).getUuid(),
127+
new CloseContainerCommand(containerId, sourcePipelines.getId(), true));
128+
122129
//WHEN: send the order to replicate the container
123130
cluster.getStorageContainerManager().getScmNodeManager()
124131
.addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),

0 commit comments

Comments
 (0)