Skip to content

Commit 346eb04

Browse files
Support moving data from one drive to another within the same node
1 parent bc00b70 commit 346eb04

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1693
-138
lines changed

cmd/directpv/controller.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,19 @@ package main
1818

1919
import (
2020
"context"
21+
"os"
22+
"time"
2123

2224
"github.com/container-storage-interface/spec/lib/go/csi"
2325
"github.com/minio/directpv/pkg/consts"
2426
"github.com/minio/directpv/pkg/csi/controller"
2527
pkgidentity "github.com/minio/directpv/pkg/csi/identity"
28+
"github.com/minio/directpv/pkg/jobs"
29+
"github.com/minio/directpv/pkg/k8s"
2630
"github.com/spf13/cobra"
31+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/client-go/tools/leaderelection"
33+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2734
"k8s.io/klog/v2"
2835
)
2936

@@ -75,5 +82,44 @@ func startController(ctx context.Context) error {
7582
}
7683
}()
7784

85+
go func() {
86+
runJobsController(ctx)
87+
}()
88+
7889
return <-errCh
7990
}
91+
92+
func runJobsController(ctx context.Context) {
93+
podName := os.Getenv("HOSTNAME")
94+
if podName == "" {
95+
klog.V(5).Info("unable to get the pod name from env; defaulting to pod name: directpv-controller")
96+
podName = "directpv-controller"
97+
}
98+
lock := &resourcelock.LeaseLock{
99+
LeaseMeta: metav1.ObjectMeta{
100+
Name: consts.AppName + "-jobs-controller",
101+
Namespace: consts.AppNamespace,
102+
},
103+
Client: k8s.KubeClient().CoordinationV1(),
104+
LockConfig: resourcelock.ResourceLockConfig{
105+
Identity: podName,
106+
},
107+
}
108+
// start the leader election code loop
109+
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
110+
Lock: lock,
111+
ReleaseOnCancel: true,
112+
LeaseDuration: 60 * time.Second,
113+
RenewDeadline: 15 * time.Second,
114+
RetryPeriod: 5 * time.Second,
115+
Callbacks: leaderelection.LeaderCallbacks{
116+
OnStartedLeading: func(ctx context.Context) {
117+
klog.Info("started leading")
118+
jobs.StartController(ctx)
119+
},
120+
OnStoppedLeading: func() {
121+
klog.Infof("leader lost")
122+
},
123+
},
124+
})
125+
}

cmd/directpv/copy.go

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
// This file is part of MinIO DirectPV
2+
// Copyright (c) 2023 MinIO, Inc.
3+
//
4+
// This program is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Affero General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// This program is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Affero General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Affero General Public License
15+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"io"
24+
"os"
25+
"path/filepath"
26+
"strings"
27+
"syscall"
28+
"time"
29+
30+
"github.com/dustin/go-humanize"
31+
"github.com/minio/directpv/pkg/client"
32+
"github.com/minio/directpv/pkg/sys"
33+
"github.com/minio/directpv/pkg/types"
34+
"github.com/minio/directpv/pkg/xfs"
35+
xfilepath "github.com/minio/filepath"
36+
"github.com/spf13/cobra"
37+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38+
"k8s.io/klog/v2"
39+
)
40+
41+
var (
42+
volumeID string
43+
dryRunFlag bool
44+
)
45+
46+
var copyCmd = &cobra.Command{
47+
Use: "copy SRC-DRIVE DEST-DRIVE --volume-id VOLUME-ID",
48+
Short: "copy the volume data from source drive to destination drive",
49+
Aliases: []string{"cp"},
50+
SilenceUsage: true,
51+
SilenceErrors: true,
52+
RunE: func(c *cobra.Command, args []string) error {
53+
switch len(args) {
54+
case 0:
55+
return errors.New("source and destination DRIVE-IDs should be provided")
56+
case 1:
57+
return errors.New("both the source and destination DRIVE-IDs should be provided")
58+
case 2:
59+
default:
60+
return errors.New("invalid syntax")
61+
}
62+
if volumeID == "" {
63+
return errors.New("'--volume-id' should be provided")
64+
}
65+
if args[0] == args[1] {
66+
return errors.New("both the source and destination DRIVE-IDs are same")
67+
}
68+
69+
ctx := c.Context()
70+
srcDrive, err := client.DriveClient().Get(ctx, args[0], metav1.GetOptions{
71+
TypeMeta: types.NewDriveTypeMeta(),
72+
})
73+
if err != nil {
74+
return err
75+
}
76+
destDrive, err := client.DriveClient().Get(ctx, args[1], metav1.GetOptions{
77+
TypeMeta: types.NewDriveTypeMeta(),
78+
})
79+
if err != nil {
80+
return err
81+
}
82+
volume, err := client.VolumeClient().Get(ctx, volumeID, metav1.GetOptions{
83+
TypeMeta: types.NewVolumeTypeMeta(),
84+
})
85+
if err != nil {
86+
return err
87+
}
88+
if !destDrive.VolumeExist(volumeID) {
89+
return errors.New("volume finalizer not found on the destination drive")
90+
}
91+
if volume.GetNodeID() != nodeID {
92+
return errors.New("the nodeID in the volume doesn't match")
93+
}
94+
if err := checkDrive(srcDrive); err != nil {
95+
klog.ErrorS(err, "unable to check the source drive", "driveID", srcDrive.Name)
96+
return err
97+
}
98+
if err := checkDrive(destDrive); err != nil {
99+
klog.ErrorS(err, "unable to check the destination drive", "driveID", destDrive.Name)
100+
return err
101+
}
102+
err = startCopy(ctx, srcDrive, destDrive, volume)
103+
if err != nil {
104+
klog.ErrorS(err, "unable to copy", "source", srcDrive.Name, "destination", destDrive.Name)
105+
}
106+
return err
107+
},
108+
}
109+
110+
func init() {
111+
copyCmd.PersistentFlags().StringVar(&volumeID, "volume-id", volumeID, "Set the volumeID of the volume to be copied")
112+
copyCmd.PersistentFlags().BoolVar(&dryRunFlag, "dry-run", dryRunFlag, "Enable dry-run mode")
113+
}
114+
115+
func checkDrive(drive *types.Drive) error {
116+
if drive.GetNodeID() != nodeID {
117+
return errors.New("the nodeID in the drive doesn't match")
118+
}
119+
if _, err := os.Lstat(types.GetVolumeRootDir(drive.Status.FSUUID)); err != nil {
120+
return fmt.Errorf("unable to stat the volume root directory; %v", err)
121+
}
122+
if _, err := sys.GetDeviceByFSUUID(drive.Status.FSUUID); err != nil {
123+
return fmt.Errorf("unable to find device by its FSUUID; %v", err)
124+
}
125+
return nil
126+
}
127+
128+
func startCopy(ctx context.Context, srcDrive, destDrive *types.Drive, volume *types.Volume) error {
129+
if dryRunFlag {
130+
return nil
131+
}
132+
133+
sourcePath := types.GetVolumeDir(srcDrive.Status.FSUUID, volume.Name)
134+
destPath := types.GetVolumeDir(destDrive.Status.FSUUID, volume.Name)
135+
136+
if _, err := os.Lstat(sourcePath); err != nil {
137+
return fmt.Errorf("unable to stat the sourcePath %v; %v", sourcePath, err)
138+
}
139+
if err := sys.Mkdir(destPath, 0o755); err != nil && !errors.Is(err, os.ErrExist) {
140+
return fmt.Errorf("unable to create the targetPath %v; %v", destPath, err)
141+
}
142+
143+
quota := xfs.Quota{
144+
HardLimit: uint64(volume.Status.TotalCapacity),
145+
SoftLimit: uint64(volume.Status.TotalCapacity),
146+
}
147+
if err := xfs.SetQuota(ctx, "/dev/"+string(destDrive.GetDriveName()), destPath, volume.Name, quota, false); err != nil {
148+
return fmt.Errorf("unable to set quota on volume data path; %w", err)
149+
}
150+
151+
ctxWitCancel, cancel := context.WithCancel(ctx)
152+
defer func() {
153+
cancel()
154+
printProgress(ctx, srcDrive, destDrive, volume)
155+
}()
156+
go func() {
157+
logProgress(ctxWitCancel, srcDrive, destDrive, volume)
158+
}()
159+
160+
return copyData(sourcePath, destPath)
161+
}
162+
163+
func printProgress(ctx context.Context, srcDrive, destDrive *types.Drive, volume *types.Volume) error {
164+
sourceQ, err := xfs.GetQuota(ctx, "/dev/"+string(srcDrive.GetDriveName()), volume.Name)
165+
if err != nil {
166+
klog.ErrorS(err, "unable to get quota of the source drive", "source drive", srcDrive.GetDriveName(), "volume", volume.Name)
167+
return err
168+
}
169+
destQ, err := xfs.GetQuota(ctx, "/dev/"+string(destDrive.GetDriveName()), volume.Name)
170+
if err != nil {
171+
klog.ErrorS(err, "unable to get quota of the destination drive", "destination drive", destDrive.GetDriveName(), "volume", volume.Name)
172+
return err
173+
}
174+
fmt.Printf("\nCopied %v/%v", humanize.IBytes(destQ.CurrentSpace), humanize.IBytes(sourceQ.CurrentSpace))
175+
return nil
176+
}
177+
178+
func logProgress(ctx context.Context, srcDrive, destDrive *types.Drive, volume *types.Volume) {
179+
ticker := time.NewTicker(10 * time.Second)
180+
defer ticker.Stop()
181+
182+
for {
183+
select {
184+
case <-ctx.Done():
185+
return
186+
case <-ticker.C:
187+
if err := printProgress(ctx, srcDrive, destDrive, volume); err != nil {
188+
return
189+
}
190+
}
191+
}
192+
}
193+
194+
func copyData(source, destination string) error {
195+
visitFn := func(f string, fi os.FileInfo, _ error) error {
196+
targetPath := filepath.Join(destination, strings.TrimPrefix(f, source))
197+
switch {
198+
case fi.Mode()&os.ModeDir != 0:
199+
return os.MkdirAll(targetPath, fi.Mode().Perm())
200+
case fi.Mode()&os.ModeType == 0:
201+
if targetFi, err := os.Lstat(targetPath); err == nil {
202+
if targetFi.ModTime().Equal(fi.ModTime()) && targetFi.Size() == fi.Size() {
203+
return nil
204+
}
205+
}
206+
reader, err := os.Open(f)
207+
if err != nil {
208+
return err
209+
}
210+
writer, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE, 0o755)
211+
if err != nil {
212+
return err
213+
}
214+
if _, err := io.CopyN(writer, reader, fi.Size()); err != nil {
215+
return err
216+
}
217+
stat, ok := fi.Sys().(*syscall.Stat_t)
218+
if !ok {
219+
return fmt.Errorf("unable to get the stat information for %v", f)
220+
}
221+
if err := os.Chown(targetPath, int(stat.Uid), int(stat.Gid)); err != nil {
222+
return fmt.Errorf("unable to set UID and GID to path %v; %v", targetPath, err)
223+
}
224+
if err := os.Chmod(targetPath, fi.Mode().Perm()); err != nil {
225+
return fmt.Errorf("unable to chmod on path %v; %v", targetPath, err)
226+
}
227+
return os.Chtimes(targetPath, fi.ModTime(), fi.ModTime())
228+
case fi.Mode()&os.ModeSymlink != 0:
229+
// ToDo: Handle symlink
230+
return nil
231+
default:
232+
// unsupported modes
233+
return nil
234+
}
235+
}
236+
return xfilepath.Walk(source, visitFn)
237+
}

cmd/directpv/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func init() {
128128
mainCmd.AddCommand(legacyControllerCmd)
129129
mainCmd.AddCommand(legacyNodeServerCmd)
130130
mainCmd.AddCommand(nodeControllerCmd)
131+
mainCmd.AddCommand(copyCmd)
131132
}
132133

133134
func main() {

cmd/kubectl-directpv/clean.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"os"
2424
"strings"
2525

26+
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
2627
"github.com/minio/directpv/pkg/client"
2728
"github.com/minio/directpv/pkg/consts"
2829
"github.com/minio/directpv/pkg/k8s"
@@ -152,6 +153,9 @@ func cleanMain(ctx context.Context) {
152153
List(ctx)
153154

154155
matchFunc := func(volume *types.Volume) bool {
156+
if volume.Status.Status == directpvtypes.VolumeStatusCopying {
157+
return false
158+
}
155159
pv, err := k8s.KubeClient().CoreV1().PersistentVolumes().Get(ctx, volume.Name, metav1.GetOptions{})
156160
if err != nil {
157161
if apierrors.IsNotFound(err) {

0 commit comments

Comments
 (0)