Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 5 additions & 30 deletions pkg/backup/backupdest/backup_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,11 @@ func ListIndexes(
}

// GetBackupTreeIndexMetadata concurrently retrieves the index metadata for all
// backups within the specified subdir, up to the specified end time, inclusive.
// The store should be rooted at the collection URI that contains the `index/`
// directory. Indexes are returned in ascending end time order, with ties broken
// by ascending start time order. If the end time is not covered by the backups
// in the subdir, an error is returned.
//
// Note: If endTime is provided, GetBackupTreeIndexMetadata will return ALL
// backups that could be used to restore to endTime. So even if a compacted
// backup can be used to restore to endTime, the incremental backups that
// make up the compacted backup will also be returned.
// backups within the specified subdir. The store should be rooted at the
// collection URI that contains the `index/` directory. Indexes are returned in
// ascending end time order, with ties broken by ascending start time order.
func GetBackupTreeIndexMetadata(
ctx context.Context, store cloud.ExternalStorage, subdir string, endTime hlc.Timestamp,
ctx context.Context, store cloud.ExternalStorage, subdir string,
) ([]backuppb.BackupIndexMetadata, error) {
indexBasenames, err := ListIndexes(ctx, store, subdir)
if err != nil {
Expand Down Expand Up @@ -252,25 +245,7 @@ func GetBackupTreeIndexMetadata(
return nil, errors.Wrapf(err, "getting backup index metadata")
}

if endTime.IsEmpty() {
return indexes, nil
}

coveringIdx := slices.IndexFunc(indexes, func(index backuppb.BackupIndexMetadata) bool {
return index.StartTime.Less(endTime) && endTime.LessEq(index.EndTime)
})
if coveringIdx == -1 {
return nil, errors.Newf(`backups in "%s" do not cover end time %s`, subdir, endTime)
}
coverEndTime := indexes[coveringIdx].EndTime
// To include all components of a compacted backup, we need to include all
// backups with the same end time.
for ; coveringIdx < len(indexes); coveringIdx++ {
if !indexes[coveringIdx].EndTime.Equal(coverEndTime) {
break
}
}
return indexes[:coveringIdx], nil
return indexes, nil
}

// ParseBackupFilePathFromIndexFileName parses the path to a backup given the
Expand Down
73 changes: 3 additions & 70 deletions pkg/backup/backupdest/backup_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,94 +565,28 @@ func TestGetBackupTreeIndexMetadata(t *testing.T) {
testcases := []struct {
name string
chain chain
// endTime filter. Set to 0 for no filter.
endTime int
error string
error string
// expectedIndexTimes should be sorted in ascending order by end time, with
// ties broken by ascending start time.
expectedIndexTimes chain
}{
{
name: "fetch all indexes from subdir",
name: "fetch all indexes from chain with no compacted backups",
chain: simpleChain,
expectedIndexTimes: [][2]int{{0, 2}, {2, 4}, {4, 6}, {6, 8}},
},
{
name: "exact end time match",
chain: simpleChain,
endTime: 6,
expectedIndexTimes: [][2]int{{0, 2}, {2, 4}, {4, 6}},
},
{
name: "end time between an incremental",
chain: simpleChain,
endTime: 5,
expectedIndexTimes: [][2]int{{0, 2}, {2, 4}, {4, 6}},
},
{
name: "end time before full backup end",
chain: simpleChain,
endTime: 1,
expectedIndexTimes: chain{{0, 2}},
},
{
name: "end time after the chain",
chain: simpleChain,
endTime: 10,
error: "do not cover end time",
},
{
name: "fetch all indexes from tree with compacted backups",
chain: compactedChain,
expectedIndexTimes: chain{{0, 10}, {10, 11}, {10, 12}, {11, 12}, {12, 14}, {14, 16}},
},
{
name: "end time of compacted backup",
chain: compactedChain,
endTime: 12,
expectedIndexTimes: chain{{0, 10}, {10, 11}, {10, 12}, {11, 12}},
},
{
name: "end time between incremental after compacted backup",
chain: compactedChain,
endTime: 13,
expectedIndexTimes: chain{{0, 10}, {10, 11}, {10, 12}, {11, 12}, {12, 14}},
},
{
name: "end time between compacted backup",
chain: compactedChain,
endTime: 11,
expectedIndexTimes: chain{{0, 10}, {10, 11}},
},
{
name: "end time before compacted backup",
chain: compactedChain,
endTime: 11,
expectedIndexTimes: chain{{0, 10}, {10, 11}},
},
{
name: "fetch all indexes from tree with double compacted backups",
chain: doubleCompactedChain,
expectedIndexTimes: chain{
{0, 18}, {18, 20}, {18, 22}, {20, 22}, {22, 24}, {18, 26}, {24, 26},
},
},
{
name: "end time before second compacted backup but after first",
chain: doubleCompactedChain,
endTime: 24,
expectedIndexTimes: chain{
{0, 18}, {18, 20}, {18, 22}, {20, 22}, {22, 24},
},
},
{
name: "end time of second compacted backup",
chain: doubleCompactedChain,
endTime: 26,
expectedIndexTimes: chain{
{0, 18}, {18, 20}, {18, 22}, {20, 22}, {22, 24}, {18, 26}, {24, 26},
},
},
{
name: "index only contains a full backup",
chain: fullOnly,
Expand All @@ -664,9 +598,8 @@ func TestGetBackupTreeIndexMetadata(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
subdirTS := intToTime(tc.chain[0][1]).GoTime()
subdir := subdirTS.Format(backupbase.DateBasedIntoFolderName)
end := intToTime(tc.endTime)

metadatas, err := GetBackupTreeIndexMetadata(ctx, externalStorage, subdir, end)
metadatas, err := GetBackupTreeIndexMetadata(ctx, externalStorage, subdir)
if tc.error != "" {
require.ErrorContains(t, err, tc.error)
return
Expand Down