Skip to content

Commit 7c73ba1

Browse files
committed
add exec timeout func
1 parent ac7bf03 commit 7c73ba1

File tree

3 files changed

+55
-32
lines changed

3 files changed

+55
-32
lines changed

pkg/blob/controllerserver.go

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ const (
6161
MSI = "MSI"
6262
SPN = "SPN"
6363
authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
64-
65-
waitForAzCopyInterval = 2 * time.Second
6664
)
6765

6866
// CreateVolume provisions a volume
@@ -85,7 +83,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
8583
// logging the job status if it's volume cloning
8684
if req.GetVolumeContentSource() != nil {
8785
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
88-
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
86+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
8987
}
9088
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
9189
}
@@ -759,43 +757,39 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
759757
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
760758
}
761759

762-
timeAfter := time.After(time.Duration(d.waitForAzCopyTimeoutMinutes) * time.Minute)
763-
timeTick := time.Tick(waitForAzCopyInterval)
764760
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
765761
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
766762

767763
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
768764
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
769-
if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
765+
switch jobState {
766+
case util.AzcopyJobError, util.AzcopyJobCompleted:
770767
return err
771-
}
772-
klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName)
773-
for {
774-
select {
775-
case <-timeTick:
776-
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
777-
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
778-
switch jobState {
779-
case util.AzcopyJobError, util.AzcopyJobCompleted:
780-
return err
781-
case util.AzcopyJobNotFound:
782-
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
783-
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
784-
if len(authAzcopyEnv) > 0 {
785-
cmd.Env = append(os.Environ(), authAzcopyEnv...)
786-
}
787-
out, copyErr := cmd.CombinedOutput()
788-
if copyErr != nil {
789-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
790-
} else {
791-
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
792-
}
793-
return copyErr
768+
case util.AzcopyJobRunning:
769+
return fmt.Errorf("an existed azcopy job is running, copy percent: %s%%, please wait for it to complete", percent)
770+
case util.AzcopyJobNotFound:
771+
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
772+
copyErr := util.WaitForExecCompletion(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() error {
773+
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
774+
if len(authAzcopyEnv) > 0 {
775+
cmd.Env = append(os.Environ(), authAzcopyEnv...)
794776
}
795-
case <-timeAfter:
796-
return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName)
777+
if out, err := cmd.CombinedOutput(); err != nil {
778+
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
779+
}
780+
return nil
781+
}, func() error {
782+
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
783+
return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed, copy percent: %s%%", srcContainerName, dstContainerName, percent)
784+
})
785+
if copyErr != nil {
786+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
787+
} else {
788+
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
797789
}
790+
return copyErr
798791
}
792+
return err
799793
}
800794

801795
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now

pkg/blob/volume_lock.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import (
2323
)
2424

2525
const (
26-
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
26+
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
27+
volumeOperationAlreadyExistsWithAzcopyFmt = "An operation using azcopy with the given Volume ID %s already exists. Azcopy job status: %s, copy percent: %s%%, error: %v"
2728
)
2829

2930
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs

pkg/util/util.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strconv"
2525
"strings"
2626
"sync"
27+
"time"
2728

2829
"github.com/go-ini/ini"
2930
"github.com/pkg/errors"
@@ -387,3 +388,30 @@ func SetVolumeOwnership(path, gid, policy string) error {
387388
}
388389
return volume.SetVolumeOwnership(&VolumeMounter{path: path}, path, &gidInt64, &fsGroupChangePolicy, nil)
389390
}
391+
392+
// ExecFunc returns a exec function's output and error
393+
type ExecFunc func() (err error)
394+
395+
// TimeoutFunc returns output and error if an ExecFunc timeout
396+
type TimeoutFunc func() (err error)
397+
398+
// WaitForExecCompletion waits for the exec function to complete or times out
399+
func WaitForExecCompletion(timeout time.Duration, execFunc ExecFunc, timeoutFunc TimeoutFunc) error {
400+
// Create a channel to receive the result of the azcopy exec function
401+
done := make(chan bool)
402+
var err error
403+
404+
// Start the azcopy exec function in a goroutine
405+
go func() {
406+
err = execFunc()
407+
done <- true
408+
}()
409+
410+
// Wait for the function to complete or time out
411+
select {
412+
case <-done:
413+
return err
414+
case <-time.After(timeout):
415+
return timeoutFunc()
416+
}
417+
}

0 commit comments

Comments
 (0)