Skip to content

Commit 5ac2a73

Browse files
ankitsolvinayakphegde
authored andcommitted
HBASE-29310 Handle Bulk Load Operations in Continuous Backup (apache#7150)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]> Reviewed by: Kevin Geiszler <[email protected]>
1 parent 618b751 commit 5ac2a73

File tree

9 files changed

+135
-14
lines changed

9 files changed

+135
-14
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.fs.Path;
4242
import org.apache.hadoop.hbase.HBaseConfiguration;
4343
import org.apache.hadoop.hbase.TableName;
44+
import org.apache.hadoop.hbase.backup.BackupType;
4445
import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
4546
import org.apache.hadoop.hbase.backup.RestoreRequest;
4647
import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -248,6 +249,8 @@ private PitrBackupMetadata getValidBackup(TableName sTableName, TableName tTable
248249

249250
try {
250251
if (backupAdmin.validateRequest(restoreRequest)) {
252+
// check if any bulkload entry exists post this backup time and before "endtime"
253+
checkBulkLoadAfterBackup(conn, sTableName, backup, endTime);
251254
return backup;
252255
}
253256
} catch (IOException e) {
@@ -259,6 +262,31 @@ private PitrBackupMetadata getValidBackup(TableName sTableName, TableName tTable
259262
return null;
260263
}
261264

265+
/**
266+
* Checks if any bulk load operation occurred for the specified table post last successful backup
267+
* and before restore time.
268+
* @param conn Active HBase connection
269+
* @param sTableName Table for which to check bulk load history
270+
* @param backup Last successful backup before the target recovery time
271+
* @param endTime Target recovery time
272+
* @throws IOException if a bulkload entry is found in between backup time and endtime
273+
*/
274+
private void checkBulkLoadAfterBackup(Connection conn, TableName sTableName,
275+
PitrBackupMetadata backup, long endTime) throws IOException {
276+
try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
277+
List<BulkLoad> bulkLoads = backupSystemTable.readBulkloadRows(List.of(sTableName));
278+
for (BulkLoad load : bulkLoads) {
279+
long lastBackupTs = (backup.getType() == BackupType.FULL)
280+
? backup.getStartTs()
281+
: backup.getIncrCommittedWalTs();
282+
if (lastBackupTs < load.getTimestamp() && load.getTimestamp() < endTime) {
283+
throw new IOException("Bulk load operation detected after last successful backup for "
284+
+ "table: " + sTableName);
285+
}
286+
}
287+
}
288+
}
289+
262290
/**
263291
* Determines if the given backup is valid for PITR.
264292
* <p>

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.List;
2121
import org.apache.hadoop.hbase.TableName;
22+
import org.apache.hadoop.hbase.backup.BackupType;
2223
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
2324
import org.apache.yetus.audience.InterfaceAudience;
2425

@@ -57,4 +58,14 @@ public String getBackupId() {
5758
public String getRootDir() {
5859
return image.getRootDir();
5960
}
61+
62+
@Override
63+
public BackupType getType() {
64+
return image.getType();
65+
}
66+
67+
@Override
68+
public long getIncrCommittedWalTs() {
69+
return image.getIncrCommittedWalTs();
70+
}
6071
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import org.apache.hadoop.hbase.TableName;
2222
import org.apache.hadoop.hbase.backup.BackupInfo;
23+
import org.apache.hadoop.hbase.backup.BackupType;
2324
import org.apache.yetus.audience.InterfaceAudience;
2425

2526
/**
@@ -57,4 +58,14 @@ public String getBackupId() {
5758
public String getRootDir() {
5859
return info.getBackupRootDir();
5960
}
61+
62+
@Override
63+
public BackupType getType() {
64+
return info.getType();
65+
}
66+
67+
@Override
68+
public long getIncrCommittedWalTs() {
69+
return info.getIncrCommittedWalTs();
70+
}
6071
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ Builder withCompleteTime(long completeTime) {
101101
return this;
102102
}
103103

104+
Builder withIncrCommittedWalTs(long incrCommittedWalTs) {
105+
image.setIncrCommittedWalTs(incrCommittedWalTs);
106+
return this;
107+
}
108+
104109
BackupImage build() {
105110
return image;
106111
}
@@ -115,6 +120,7 @@ BackupImage build() {
115120
private long completeTs;
116121
private ArrayList<BackupImage> ancestors;
117122
private Map<TableName, Map<String, Long>> incrTimeRanges;
123+
private long incrCommittedWalTs;
118124

119125
static Builder newBuilder() {
120126
return new Builder();
@@ -125,20 +131,22 @@ public BackupImage() {
125131
}
126132

127133
private BackupImage(String backupId, BackupType type, String rootDir, List<TableName> tableList,
128-
long startTs, long completeTs) {
134+
long startTs, long completeTs, long incrCommittedWalTs) {
129135
this.backupId = backupId;
130136
this.type = type;
131137
this.rootDir = rootDir;
132138
this.tableList = tableList;
133139
this.startTs = startTs;
134140
this.completeTs = completeTs;
141+
this.incrCommittedWalTs = incrCommittedWalTs;
135142
}
136143

137144
static BackupImage fromProto(BackupProtos.BackupImage im) {
138145
String backupId = im.getBackupId();
139146
String rootDir = im.getBackupRootDir();
140147
long startTs = im.getStartTs();
141148
long completeTs = im.getCompleteTs();
149+
long incrCommittedWalTs = im.getIncrCommittedWalTs();
142150
List<HBaseProtos.TableName> tableListList = im.getTableListList();
143151
List<TableName> tableList = new ArrayList<>();
144152
for (HBaseProtos.TableName tn : tableListList) {
@@ -151,7 +159,8 @@ static BackupImage fromProto(BackupProtos.BackupImage im) {
151159
? BackupType.FULL
152160
: BackupType.INCREMENTAL;
153161

154-
BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs);
162+
BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs,
163+
incrCommittedWalTs);
155164
for (BackupProtos.BackupImage img : ancestorList) {
156165
image.addAncestor(fromProto(img));
157166
}
@@ -170,6 +179,7 @@ BackupProtos.BackupImage toProto() {
170179
builder.setBackupId(backupId);
171180
builder.setCompleteTs(completeTs);
172181
builder.setStartTs(startTs);
182+
builder.setIncrCommittedWalTs(incrCommittedWalTs);
173183
if (type == BackupType.FULL) {
174184
builder.setBackupType(BackupProtos.BackupType.FULL);
175185
} else {
@@ -287,6 +297,14 @@ public long getCompleteTs() {
287297
return completeTs;
288298
}
289299

300+
public long getIncrCommittedWalTs() {
301+
return incrCommittedWalTs;
302+
}
303+
304+
public void setIncrCommittedWalTs(long incrCommittedWalTs) {
305+
this.incrCommittedWalTs = incrCommittedWalTs;
306+
}
307+
290308
private void setCompleteTs(long completeTs) {
291309
this.completeTs = completeTs;
292310
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import org.apache.hadoop.hbase.TableName;
2222
import org.apache.hadoop.hbase.backup.BackupInfo;
23+
import org.apache.hadoop.hbase.backup.BackupType;
2324
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
2425
import org.apache.yetus.audience.InterfaceAudience;
2526

@@ -47,4 +48,10 @@ public interface PitrBackupMetadata {
4748

4849
/** Returns Root directory where the backup is stored */
4950
String getRootDir();
51+
52+
/** Returns backup type */
53+
BackupType getType();
54+
55+
/** Returns incrCommittedWalTs */
56+
long getIncrCommittedWalTs();
5057
}

hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
2222
import static org.junit.Assert.assertEquals;
2323
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertNotEquals;
2425
import static org.junit.Assert.assertTrue;
2526

2627
import java.io.IOException;
@@ -33,16 +34,13 @@
3334
import org.apache.hadoop.fs.FileStatus;
3435
import org.apache.hadoop.fs.FileSystem;
3536
import org.apache.hadoop.fs.Path;
36-
import org.apache.hadoop.hbase.CellUtil;
3737
import org.apache.hadoop.hbase.HBaseClassTestRule;
3838
import org.apache.hadoop.hbase.TableName;
3939
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
4040
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
4141
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
4242
import org.apache.hadoop.hbase.backup.util.BackupUtils;
43-
import org.apache.hadoop.hbase.client.Get;
4443
import org.apache.hadoop.hbase.client.Put;
45-
import org.apache.hadoop.hbase.client.Result;
4644
import org.apache.hadoop.hbase.client.Table;
4745
import org.apache.hadoop.hbase.testclassification.LargeTests;
4846
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
@@ -68,8 +66,6 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup {
6866
private static final Logger LOG =
6967
LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
7068

71-
private byte[] ROW = Bytes.toBytes("row1");
72-
private final byte[] COLUMN = Bytes.toBytes("col");
7369
private static final int ROWS_IN_BULK_LOAD = 100;
7470

7571
@Test
@@ -186,11 +182,55 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws
186182
}
187183
}
188184

189-
private void verifyTable(Table t1) throws IOException {
190-
Get g = new Get(ROW);
191-
Result r = t1.get(g);
192-
assertEquals(1, r.size());
193-
assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN));
185+
@Test
186+
public void testPitrFailureDueToMissingBackupPostBulkload() throws Exception {
187+
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
188+
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
189+
TableName tableName1 = TableName.valueOf("table_" + methodName);
190+
TEST_UTIL.createTable(tableName1, famName);
191+
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
192+
193+
// The test starts with no data, and no bulk loaded rows.
194+
int expectedRowCount = 0;
195+
assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
196+
assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
197+
198+
// Create continuous backup, bulk loads are now being tracked
199+
String backup1 = backupTables(BackupType.FULL, List.of(tableName1), BACKUP_ROOT_DIR, true);
200+
assertTrue(checkSucceeded(backup1));
201+
202+
loadTable(TEST_UTIL.getConnection().getTable(tableName1));
203+
expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH;
204+
performBulkLoad("bulkPreIncr", methodName, tableName1);
205+
expectedRowCount += ROWS_IN_BULK_LOAD;
206+
assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
207+
assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size());
208+
209+
loadTable(TEST_UTIL.getConnection().getTable(tableName1));
210+
Thread.sleep(5000);
211+
212+
// Incremental backup
213+
String backup2 =
214+
backupTables(BackupType.INCREMENTAL, List.of(tableName1), BACKUP_ROOT_DIR, true);
215+
assertTrue(checkSucceeded(backup2));
216+
assertEquals(0, systemTable.readBulkloadRows(List.of(tableName1)).size());
217+
218+
performBulkLoad("bulkPostIncr", methodName, tableName1);
219+
assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size());
220+
221+
loadTable(TEST_UTIL.getConnection().getTable(tableName1));
222+
Thread.sleep(10000);
223+
long restoreTs = BackupUtils.getReplicationCheckpoint(TEST_UTIL.getConnection());
224+
225+
// expect restore failure due to no backup post bulkPostIncr bulkload
226+
TableName restoredTable = TableName.valueOf("restoredTable");
227+
String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { tableName1 },
228+
new TableName[] { restoredTable }, restoreTs, null);
229+
int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
230+
assertNotEquals("Restore should fail since there is one bulkload without any backup", 0, ret);
231+
} finally {
232+
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
233+
}
194234
}
195235

196236
private void performBulkLoad(String keyPrefix, String testDir, TableName tableName)

hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ private static void setUpBackups() throws Exception {
6767
// Simulate a backup taken 20 days ago
6868
EnvironmentEdgeManager
6969
.injectEdge(() -> System.currentTimeMillis() - 20 * ONE_DAY_IN_MILLISECONDS);
70-
PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert initial data into
71-
// table1
70+
// Insert initial data into table1
71+
PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000);
7272

7373
// Perform a full backup for table1 with continuous backup enabled
7474
String[] args =

hbase-protocol-shaded/src/main/protobuf/Backup.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ message BackupImage {
6565
optional uint64 complete_ts = 6;
6666
repeated BackupImage ancestors = 7;
6767
repeated TableServerTimestamp tst_map = 8;
68+
optional uint64 incr_committed_wal_ts = 9;
6869

6970
}
7071

hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,6 +1195,11 @@ public int run(String[] args) throws Exception {
11951195
public static void main(String[] args) throws Exception {
11961196
Configuration conf = HBaseConfiguration.create();
11971197
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
1198+
if (ret == 0) {
1199+
System.out.println("Bulk load completed successfully.");
1200+
System.out.println("IMPORTANT: Please take a backup of the table immediately if this table "
1201+
+ "is part of continuous backup");
1202+
}
11981203
System.exit(ret);
11991204
}
12001205

0 commit comments

Comments
 (0)