Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.hadoop.hbase.backup;

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_ENABLE_CONTINUOUS_BACKUP;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP;
Expand Down Expand Up @@ -159,7 +162,8 @@ protected void addOptions() {
addOptWithArg(OPTION_PATH, OPTION_PATH_DESC);
addOptWithArg(OPTION_KEEP, OPTION_KEEP_DESC);
addOptWithArg(OPTION_YARN_QUEUE_NAME, OPTION_YARN_QUEUE_NAME_DESC);

addOptNoArg(OPTION_ENABLE_CONTINUOUS_BACKUP, LONG_OPTION_ENABLE_CONTINUOUS_BACKUP,
OPTION_ENABLE_CONTINUOUS_BACKUP_DESC);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public enum BackupState {
*/
public enum BackupPhase {
REQUEST,
SETUP_WAL_REPLICATION,
SNAPSHOT,
PREPARE_INCREMENTAL,
SNAPSHOTCOPY,
Expand Down Expand Up @@ -170,6 +171,8 @@ public enum BackupPhase {
*/
private boolean noChecksumVerify;

private boolean continuousBackupEnabled;

public BackupInfo() {
backupTableInfoMap = new HashMap<>();
}
Expand All @@ -185,6 +188,7 @@ public BackupInfo(String backupId, BackupType type, TableName[] tables, String t
}
this.startTs = 0;
this.completeTs = 0;
this.continuousBackupEnabled = false;
}

public int getWorkers() {
Expand Down Expand Up @@ -592,4 +596,12 @@ public int compareTo(BackupInfo o) {
Long otherTS = Long.valueOf(o.getBackupId().substring(o.getBackupId().lastIndexOf("_") + 1));
return thisTS.compareTo(otherTS);
}

public void setContinuousBackupEnabled(boolean continuousBackupEnabled) {
this.continuousBackupEnabled = continuousBackupEnabled;
}

public boolean isContinuousBackupEnabled() {
return this.continuousBackupEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public Builder withYarnPoolName(String name) {
return this;
}

public Builder withContinuousBackupEnabled(boolean continuousBackupEnabled) {
request.setContinuousBackupEnabled(continuousBackupEnabled);
return this;
}

public BackupRequest build() {
return request;
}
Expand All @@ -89,6 +94,7 @@ public BackupRequest build() {
private boolean noChecksumVerify = false;
private String backupSetName;
private String yarnPoolName;
private boolean continuousBackupEnabled;

private BackupRequest() {
}
Expand Down Expand Up @@ -163,4 +169,12 @@ public String getYarnPoolName() {
public void setYarnPoolName(String yarnPoolName) {
this.yarnPoolName = yarnPoolName;
}

private void setContinuousBackupEnabled(boolean continuousBackupEnabled) {
this.continuousBackupEnabled = continuousBackupEnabled;
}

public boolean isContinuousBackupEnabled() {
return this.continuousBackupEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public interface BackupRestoreConstants {
String OPTION_YARN_QUEUE_NAME_DESC = "Yarn queue name to run backup create command on";
String OPTION_YARN_QUEUE_NAME_RESTORE_DESC = "Yarn queue name to run backup restore command on";

String OPTION_ENABLE_CONTINUOUS_BACKUP = "cb";
String LONG_OPTION_ENABLE_CONTINUOUS_BACKUP = "continuous-backup-enabled";
String OPTION_ENABLE_CONTINUOUS_BACKUP_DESC =
"Flag indicating that the full backup is part of a continuous backup process.";

String JOB_NAME_CONF_KEY = "mapreduce.job.name";

String BACKUP_CONFIG_STRING =
Expand All @@ -122,6 +127,13 @@ public interface BackupRestoreConstants {

String BACKUPID_PREFIX = "backup_";

String CONTINUOUS_BACKUP_REPLICATION_PEER = "continuous_backup_replication_peer";

String DEFAULT_CONTINUOUS_BACKUP_REPLICATION_ENDPOINT =
"org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint";

String CONF_CONTINUOUS_BACKUP_WAL_DIR = "hbase.backup.continuous.wal.dir";

enum BackupCommand {
CREATE,
CANCEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ public String backupTables(BackupRequest request) throws IOException {
request = builder.withBackupType(request.getBackupType()).withTableList(tableList)
.withTargetRootDir(request.getTargetRootDir()).withBackupSetName(request.getBackupSetName())
.withTotalTasks(request.getTotalTasks()).withBandwidthPerTasks((int) request.getBandwidth())
.withNoChecksumVerify(request.getNoChecksumVerify()).build();
.withNoChecksumVerify(request.getNoChecksumVerify())
.withContinuousBackupEnabled(request.isContinuousBackupEnabled()).build();

TableBackupClient client;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP;
Expand All @@ -45,6 +47,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
Expand Down Expand Up @@ -339,14 +342,64 @@ public void execute() throws IOException {

boolean ignoreChecksum = cmdline.hasOption(OPTION_IGNORECHECKSUM);

BackupType backupType = BackupType.valueOf(args[1].toUpperCase());
List<TableName> tableNameList = null;
if (tables != null) {
tableNameList = Lists.newArrayList(BackupUtils.parseTableNames(tables));
}
boolean continuousBackup = cmdline.hasOption(OPTION_ENABLE_CONTINUOUS_BACKUP);
if (continuousBackup && !BackupType.FULL.equals(backupType)) {
System.out.println("ERROR: Continuous backup can Only be specified for Full Backup");
printUsage();
throw new IOException(INCORRECT_USAGE);
}
Comment on lines +351 to +355
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just wondering, since continuous backup is meaningful only for full backups, what if we introduce a new backup type? Maybe we already discussed this, I can't remember.

Usage: hbase backup create <type> <backup_path> [options]
  type           "full" to create a full backup image
                 "incremental" to create an incremental backup image
                 "continuous" to create new continuous backup
  backup_path     Full path to store the backup image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we discussed this and agreed to go with a new flag for the full backup.

  • A continuous backup just takes a full backup and starts the WAL replication process. This happens only once—after that, we only need to take full backups. So, I don’t see a need for a separate backup type.
  • Also, introducing a new type would unnecessarily differentiate full and continuous backups.
  • In the future, we want to phase out the old incremental backup implementation. The plan is to use WALs replicated through continuous backup for incremental backups. So eventually, we might just have FULL and INCREMENTAL backups, where incremental backups rely on continuously replicated WALs.
  • From a code perspective, our backup code has a lot of(almost everywhere) if-else checks for backup types. Adding a new type would mean modifying all of them.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm convinced let's go with this. I didn't remember the discussion.


/*
* The `continuousBackup` flag is specified only during the first full backup to initiate
* continuous WAL replication. After that, it is redundant because the tables are already set
* up for continuous backup. If the `continuousBackup` flag is not explicitly enabled, we need
* to determine the backup mode based on the current state of the specified tables: - If all
* the specified tables are already part of continuous backup, we treat the request as a
* continuous backup request and proceed accordingly (since these tables are already
* continuously backed up, no additional setup is needed). - If none of the specified tables
* are part of continuous backup, we treat the request as a normal full backup without
* continuous backup. - If the request includes a mix of tables—some with continuous backup
* enabled and others without—we cannot determine a clear backup strategy. In this case, we
* throw an error. If all tables are already in continuous backup mode, we explicitly set the
* `continuousBackup` flag to `true` so that the request is processed using the continuous
* backup approach rather than the normal full backup flow.
*/
if (!continuousBackup && tableNameList != null && !tableNameList.isEmpty()) {
try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
Set<TableName> continuousBackupTableSet =
backupSystemTable.getContinuousBackupTableSet().keySet();

boolean allTablesInContinuousBackup = continuousBackupTableSet.containsAll(tableNameList);
boolean noTablesInContinuousBackup =
tableNameList.stream().noneMatch(continuousBackupTableSet::contains);

// Ensure that all tables are either fully in continuous backup or not at all
if (!allTablesInContinuousBackup && !noTablesInContinuousBackup) {
System.err
.println("ERROR: Some tables are already in continuous backup, while others are not. "
+ "Cannot mix both in a single request.");
printUsage();
throw new IOException(INCORRECT_USAGE);
}

// If all tables are already in continuous backup, enable the flag
if (allTablesInContinuousBackup) {
continuousBackup = true;
}
}
}

try (BackupAdminImpl admin = new BackupAdminImpl(conn)) {
BackupRequest.Builder builder = new BackupRequest.Builder();
BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
.withTableList(
tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
BackupRequest request = builder.withBackupType(backupType).withTableList(tableNameList)
.withTargetRootDir(targetBackupDir).withTotalTasks(workers)
.withBandwidthPerTasks(bandwidth).withNoChecksumVerify(ignoreChecksum)
.withBackupSetName(setName).build();
.withBackupSetName(setName).withContinuousBackupEnabled(continuousBackup).build();
String backupId = admin.backupTables(request);
System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
} catch (IOException e) {
Expand Down Expand Up @@ -400,6 +453,8 @@ protected void printUsage() {
options.addOption(OPTION_YARN_QUEUE_NAME, true, OPTION_YARN_QUEUE_NAME_DESC);
options.addOption(OPTION_DEBUG, false, OPTION_DEBUG_DESC);
options.addOption(OPTION_IGNORECHECKSUM, false, OPTION_IGNORECHECKSUM_DESC);
options.addOption(OPTION_ENABLE_CONTINUOUS_BACKUP, false,
OPTION_ENABLE_CONTINUOUS_BACKUP_DESC);

HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.setLeftPadding(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ public void close() {
* @throws BackupException exception
*/
public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
String targetRootDir, int workers, long bandwidth, boolean noChecksumVerify)
throws BackupException {
String targetRootDir, int workers, long bandwidth, boolean noChecksumVerify,
boolean continuousBackupEnabled) throws BackupException {
if (targetRootDir == null) {
throw new BackupException("Wrong backup request parameter: target backup root directory");
}
Expand Down Expand Up @@ -232,6 +232,7 @@ public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableN
backupInfo.setBandwidth(bandwidth);
backupInfo.setWorkers(workers);
backupInfo.setNoChecksumVerify(noChecksumVerify);
backupInfo.setContinuousBackupEnabled(continuousBackupEnabled);
return backupInfo;
}

Expand Down Expand Up @@ -422,4 +423,17 @@ public void addIncrementalBackupTableSet(Set<TableName> tables) throws IOExcepti
public Connection getConnection() {
return conn;
}

/**
* Adds a set of tables to the global continuous backup set. Only tables that do not already have
* continuous backup enabled will be updated.
* @param tables set of tables to add to continuous backup
* @param startTimestamp timestamp indicating when continuous backup started for newly added
* tables
* @throws IOException if an error occurs while updating the backup system table
*/
public void addContinuousBackupTableSet(Set<TableName> tables, long startTimestamp)
throws IOException {
systemTable.addContinuousBackupTableSet(tables, startTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public String toString() {
private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");

private final static String INCR_BACKUP_SET = "incrbackupset:";
private final static String CONTINUOUS_BACKUP_SET = "continuousbackupset";
private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
private final static String RS_LOG_TS_PREFIX = "rslogts:";

Expand Down Expand Up @@ -1025,6 +1026,37 @@ public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOE
}
}

/**
* Retrieves the current set of tables covered by continuous backup along with the timestamp
* indicating when continuous backup started for each table.
* @return a map where the key is the table name and the value is the timestamp representing the
* start time of continuous backup for that table.
* @throws IOException if an I/O error occurs while accessing the backup system table.
*/
public Map<TableName, Long> getContinuousBackupTableSet() throws IOException {
LOG.trace("Retrieving continuous backup table set from the backup system table.");
Map<TableName, Long> tableMap = new TreeMap<>();

try (Table systemTable = connection.getTable(tableName)) {
Get getOperation = createGetForContinuousBackupTableSet();
Result result = systemTable.get(getOperation);

if (result.isEmpty()) {
return tableMap;
}

// Extract table names and timestamps from the result cells
List<Cell> cells = result.listCells();
for (Cell cell : cells) {
TableName tableName = TableName.valueOf(CellUtil.cloneQualifier(cell));
long timestamp = Bytes.toLong(CellUtil.cloneValue(cell));
tableMap.put(tableName, timestamp);
}
}

return tableMap;
}

/**
* Add tables to global incremental backup set
* @param tables set of tables
Expand All @@ -1046,6 +1078,34 @@ public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoo
}
}

/**
* Add tables to the global continuous backup set. Only updates tables that are not already in the
* continuous backup set.
* @param tables set of tables to add
* @param startTimestamp timestamp indicating when continuous backup started
* @throws IOException if an error occurs while updating the backup system table
*/
public void addContinuousBackupTableSet(Set<TableName> tables, long startTimestamp)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Add continuous backup table set to backup system table. tables ["
+ StringUtils.join(tables, " ") + "]");
}
if (LOG.isDebugEnabled()) {
tables.forEach(table -> LOG.debug(Objects.toString(table)));
}

// Get existing continuous backup tables
Map<TableName, Long> existingTables = getContinuousBackupTableSet();

try (Table table = connection.getTable(tableName)) {
Put put = createPutForContinuousBackupTableSet(tables, existingTables, startTimestamp);
if (!put.isEmpty()) {
table.put(put);
}
}
}

/**
* Deletes incremental backup set for a backup destination
* @param backupRoot backup root
Expand Down Expand Up @@ -1374,6 +1434,18 @@ private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException
return get;
}

/**
* Creates a Get operation to retrieve the continuous backup table set from the backup system
* table.
* @return a Get operation for retrieving the table set
*/
private Get createGetForContinuousBackupTableSet() throws IOException {
Get get = new Get(rowkey(CONTINUOUS_BACKUP_SET));
get.addFamily(BackupSystemTable.META_FAMILY);
get.readVersions(1);
return get;
}

/**
* Creates Put to store incremental backup table set
* @param tables tables
Expand All @@ -1388,6 +1460,28 @@ private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupR
return put;
}

/**
* Creates a Put operation to store the continuous backup table set. Only includes tables that are
* not already in the set.
* @param tables tables to add
* @param existingTables tables that already have continuous backup enabled
* @param startTimestamp timestamp indicating when continuous backup started
* @return put operation
*/
private Put createPutForContinuousBackupTableSet(Set<TableName> tables,
Map<TableName, Long> existingTables, long startTimestamp) {
Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET));

for (TableName table : tables) {
if (!existingTables.containsKey(table)) {
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
Bytes.toBytes(startTimestamp));
}
}

return put;
}

/**
* Creates Delete for incremental backup table set
* @param backupRoot backup root
Expand Down
Loading