Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/blob/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func IsAzureStackCloud(cloud *azure.Cloud) bool {
}

// getCloudProvider get Azure Cloud Provider
func getCloudProvider(kubeconfig, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool, kubeAPIQPS float64, kubeAPIBurst int) (*azure.Cloud, error) {
func GetCloudProvider(kubeconfig, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool, kubeAPIQPS float64, kubeAPIBurst int) (*azure.Cloud, error) {
var (
config *azure.Config
kubeClient *clientset.Clientset
Expand Down
2 changes: 1 addition & 1 deletion pkg/blob/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ users:
}
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
}
cloud, err := getCloudProvider(test.kubeconfig, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig, 25.0, 50)
cloud, err := GetCloudProvider(test.kubeconfig, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig, 25.0, 50)
if !reflect.DeepEqual(err, test.expectedErr) && test.expectedErr != nil && !strings.Contains(err.Error(), test.expectedErr.Error()) {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr)
}
Expand Down
58 changes: 15 additions & 43 deletions pkg/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,22 +153,15 @@ var (
type DriverOptions struct {
NodeID string
DriverName string
CloudConfigSecretName string
CloudConfigSecretNamespace string
CustomUserAgent string
UserAgentSuffix string
BlobfuseProxyEndpoint string
EnableBlobfuseProxy bool
BlobfuseProxyConnTimout int
EnableBlobMockMount bool
AllowEmptyCloudConfig bool
AllowInlineVolumeKeyAccessWithIdentity bool
EnableGetVolumeStats bool
AppendTimeStampInCacheDir bool
AppendMountErrorHelpLink bool
MountPermissions uint64
KubeAPIQPS float64
KubeAPIBurst int
EnableAznfsMount bool
VolStatsCacheExpireInMinutes int
SasTokenExpirationMinutes int
Expand All @@ -178,24 +171,17 @@ type DriverOptions struct {
type Driver struct {
csicommon.CSIDriver

cloud *azure.Cloud
cloudConfigSecretName string
cloudConfigSecretNamespace string
customUserAgent string
userAgentSuffix string
blobfuseProxyEndpoint string
cloud *azure.Cloud
blobfuseProxyEndpoint string
// enableBlobMockMount is only for testing, DO NOT set as true in non-testing scenario
enableBlobMockMount bool
enableBlobfuseProxy bool
allowEmptyCloudConfig bool
enableGetVolumeStats bool
allowInlineVolumeKeyAccessWithIdentity bool
appendTimeStampInCacheDir bool
appendMountErrorHelpLink bool
blobfuseProxyConnTimout int
mountPermissions uint64
kubeAPIQPS float64
kubeAPIBurst int
enableAznfsMount bool
mounter *mount.SafeFormatAndMount
volLockMap *util.LockMap
Expand All @@ -220,26 +206,19 @@ type Driver struct {

// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
// does not support optional driver plugin info manifest field. Refer to CSI spec for more details.
func NewDriver(options *DriverOptions) *Driver {
func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
d := Driver{
volLockMap: util.NewLockMap(),
subnetLockMap: util.NewLockMap(),
volumeLocks: newVolumeLocks(),
cloudConfigSecretName: options.CloudConfigSecretName,
cloudConfigSecretNamespace: options.CloudConfigSecretNamespace,
customUserAgent: options.CustomUserAgent,
userAgentSuffix: options.UserAgentSuffix,
blobfuseProxyEndpoint: options.BlobfuseProxyEndpoint,
enableBlobfuseProxy: options.EnableBlobfuseProxy,
allowInlineVolumeKeyAccessWithIdentity: options.AllowInlineVolumeKeyAccessWithIdentity,
blobfuseProxyConnTimout: options.BlobfuseProxyConnTimout,
enableBlobMockMount: options.EnableBlobMockMount,
allowEmptyCloudConfig: options.AllowEmptyCloudConfig,
enableGetVolumeStats: options.EnableGetVolumeStats,
appendMountErrorHelpLink: options.AppendMountErrorHelpLink,
mountPermissions: options.MountPermissions,
kubeAPIQPS: options.KubeAPIQPS,
kubeAPIBurst: options.KubeAPIBurst,
enableAznfsMount: options.EnableAznfsMount,
sasTokenExpirationMinutes: options.SasTokenExpirationMinutes,
azcopy: &util.Azcopy{},
Expand All @@ -263,25 +242,7 @@ func NewDriver(options *DriverOptions) *Driver {
if d.volStatsCache, err = azcache.NewTimedCache(time.Duration(options.VolStatsCacheExpireInMinutes)*time.Minute, getter, false); err != nil {
klog.Fatalf("%v", err)
}
return &d
}

// Run driver initialization
func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) {
versionMeta, err := GetVersionYAML(d.Name)
if err != nil {
klog.Fatalf("%v", err)
}
klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)

userAgent := GetUserAgent(d.Name, d.customUserAgent, d.userAgentSuffix)
klog.V(2).Infof("driver userAgent: %s", userAgent)
d.cloud, err = getCloudProvider(kubeconfig, d.NodeID, d.cloudConfigSecretName, d.cloudConfigSecretNamespace, userAgent, d.allowEmptyCloudConfig, d.kubeAPIQPS, d.kubeAPIBurst)
if err != nil {
klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err)
}
klog.V(2).Infof("cloud: %s, location: %s, rg: %s, VnetName: %s, VnetResourceGroup: %s, SubnetName: %s", d.cloud.Cloud, d.cloud.Location, d.cloud.ResourceGroup, d.cloud.VnetName, d.cloud.VnetResourceGroup, d.cloud.SubnetName)

d.cloud = cloud
d.mounter = &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: utilexec.New(),
Expand Down Expand Up @@ -316,6 +277,17 @@ func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) {
}
d.AddNodeServiceCapabilities(nodeCap)

return &d
}

// Run driver initialization
func (d *Driver) Run(endpoint string, testBool bool) {
versionMeta, err := GetVersionYAML(d.Name)
if err != nil {
klog.Fatalf("%v", err)
}
klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)

s := csicommon.NewNonBlockingGRPCServer()
// Driver d act as IdentityServer, ControllerServer and NodeServer
s.Start(endpoint, d, d, d, testBool)
Expand Down
11 changes: 5 additions & 6 deletions pkg/blob/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ func NewFakeDriver() *Driver {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
driver := NewDriver(&driverOptions)
driver := NewDriver(&driverOptions, &azure.Cloud{})
driver.Name = fakeDriverName
driver.Version = vendorVersion
driver.subnetLockMap = util.NewLockMap()
driver.cloud = &azure.Cloud{}
return driver
}

Expand All @@ -73,7 +72,7 @@ func TestNewFakeDriver(t *testing.T) {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
d := NewDriver(&driverOptions)
d := NewDriver(&driverOptions, &azure.Cloud{})
assert.NotNil(t, d)
}

Expand All @@ -86,7 +85,7 @@ func TestNewDriver(t *testing.T) {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
driver := NewDriver(&driverOptions)
driver := NewDriver(&driverOptions, &azure.Cloud{})
fakedriver := NewFakeDriver()
fakedriver.Name = DefaultDriverName
fakedriver.Version = driverVersion
Expand Down Expand Up @@ -134,7 +133,7 @@ func TestRun(t *testing.T) {
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)

d := NewFakeDriver()
d.Run("tcp://127.0.0.1:0", "", true)
d.Run("tcp://127.0.0.1:0", true)
},
},
{
Expand All @@ -161,7 +160,7 @@ func TestRun(t *testing.T) {
d := NewFakeDriver()
d.cloud = &azure.Cloud{}
d.NodeID = ""
d.Run("tcp://127.0.0.1:0", "", true)
d.Run("tcp://127.0.0.1:0", true)
},
},
}
Expand Down
41 changes: 0 additions & 41 deletions pkg/blob/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,6 @@ func TestCreateVolume(t *testing.T) {
name string
testFunc func(t *testing.T)
}{
{
name: "invalid create volume req",
testFunc: func(t *testing.T) {
d := NewFakeDriver()
req := &csi.CreateVolumeRequest{}
_, err := d.CreateVolume(context.Background(), req)
expectedErr := status.Error(codes.InvalidArgument, "CREATE_DELETE_VOLUME")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
}
},
},
{
name: "volume Name missing",
testFunc: func(t *testing.T) {
Expand Down Expand Up @@ -893,20 +881,6 @@ func TestDeleteVolume(t *testing.T) {
}
},
},
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is deleted because fake driver has got required capabilities.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the result of this test now? invalid create volume req" ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

original test is for testing invalid create volume req, not capabilities

name: "invalid delete volume req",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add this ut back?

testFunc: func(t *testing.T) {
d := NewFakeDriver()
req := &csi.DeleteVolumeRequest{
VolumeId: "unit-test",
}
_, err := d.DeleteVolume(context.Background(), req)
expectedErr := status.Errorf(codes.Internal, "invalid delete volume req: volume_id:\"unit-test\" ")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
}
},
},
{
name: "invalid volume Id",
testFunc: func(t *testing.T) {
Expand Down Expand Up @@ -1298,21 +1272,6 @@ func TestControllerExpandVolume(t *testing.T) {
}
},
},
{
name: "invalid expand volume req",
testFunc: func(t *testing.T) {
d := NewFakeDriver()
req := &csi.ControllerExpandVolumeRequest{
VolumeId: "unit-test",
CapacityRange: &csi.CapacityRange{},
}
_, err := d.ControllerExpandVolume(context.Background(), req)
expectedErr := status.Errorf(codes.Internal, "invalid expand volume req: volume_id:\"unit-test\" capacity_range:<> ")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
}
},
},
{
name: "Volume Size exceeds Max Container Size",
testFunc: func(t *testing.T) {
Expand Down
21 changes: 12 additions & 9 deletions pkg/blobplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,34 @@ func handle() {
driverOptions := blob.DriverOptions{
NodeID: *nodeID,
DriverName: *driverName,
CloudConfigSecretName: *cloudConfigSecretName,
CloudConfigSecretNamespace: *cloudConfigSecretNamespace,
BlobfuseProxyEndpoint: *blobfuseProxyEndpoint,
EnableBlobfuseProxy: *enableBlobfuseProxy,
BlobfuseProxyConnTimout: *blobfuseProxyConnTimout,
EnableBlobMockMount: *enableBlobMockMount,
CustomUserAgent: *customUserAgent,
UserAgentSuffix: *userAgentSuffix,
AllowEmptyCloudConfig: *allowEmptyCloudConfig,
EnableGetVolumeStats: *enableGetVolumeStats,
AppendTimeStampInCacheDir: *appendTimeStampInCacheDir,
MountPermissions: *mountPermissions,
AllowInlineVolumeKeyAccessWithIdentity: *allowInlineVolumeKeyAccessWithIdentity,
AppendMountErrorHelpLink: *appendMountErrorHelpLink,
KubeAPIQPS: *kubeAPIQPS,
KubeAPIBurst: *kubeAPIBurst,
EnableAznfsMount: *enableAznfsMount,
VolStatsCacheExpireInMinutes: *volStatsCacheExpireInMinutes,
SasTokenExpirationMinutes: *sasTokenExpirationMinutes,
}
driver := blob.NewDriver(&driverOptions)

userAgent := blob.GetUserAgent(driverOptions.DriverName, *customUserAgent, *userAgentSuffix)
klog.V(2).Infof("driver userAgent: %s", userAgent)

cloud, err := blob.GetCloudProvider(*kubeconfig, driverOptions.NodeID, *cloudConfigSecretName, *cloudConfigSecretNamespace, userAgent, *allowEmptyCloudConfig, *kubeAPIQPS, *kubeAPIBurst)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put all these into DriverOptions? that's the reason why DriverOptions exists, it could save parameter num

*cloudConfigSecretName, *cloudConfigSecretNamespace, userAgent, *allowEmptyCloudConfig, *kubeAPIQPS, *kubeAPIBurst

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little bit complicated to add mock for GetCloudProvider

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but why move part of the options as parameters? this is not consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I move other dependencies out of newDriver function? In that case that would be easier to compose driver object and there is no need to implement FakeDriver.

if err != nil {
klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err)
}
klog.V(2).Infof("cloud: %s, location: %s, rg: %s, VnetName: %s, VnetResourceGroup: %s, SubnetName: %s", cloud.Cloud, cloud.Location, cloud.ResourceGroup, cloud.VnetName, cloud.VnetResourceGroup, cloud.SubnetName)

driver := blob.NewDriver(&driverOptions, cloud)
if driver == nil {
klog.Fatalln("Failed to initialize Azure Blob Storage CSI driver")
}
driver.Run(*endpoint, *kubeconfig, false)
driver.Run(*endpoint, false)
}

func exportMetrics() {
Expand Down
8 changes: 5 additions & 3 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx ginkgo.SpecContext) []byte {
// spin up a blob driver locally to make use of the azure client and controller service
kubeconfig := os.Getenv(kubeconfigEnvVar)
_, useBlobfuseProxy := os.LookupEnv("ENABLE_BLOBFUSE_PROXY")
os.Setenv("AZURE_CREDENTIAL_FILE", credentials.TempAzureCredentialFilePath)
driverOptions := blob.DriverOptions{
NodeID: os.Getenv("nodeid"),
DriverName: blob.DefaultDriverName,
Expand All @@ -150,10 +151,11 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx ginkgo.SpecContext) []byte {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
blobDriver = blob.NewDriver(&driverOptions)
cloud, err := blob.GetCloudProvider(kubeconfig, driverOptions.NodeID, "", "", "", false, 0, 0)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
blobDriver = blob.NewDriver(&driverOptions, cloud)
go func() {
os.Setenv("AZURE_CREDENTIAL_FILE", credentials.TempAzureCredentialFilePath)
blobDriver.Run(fmt.Sprintf("unix:///tmp/csi-%s.sock", uuid.NewUUID().String()), kubeconfig, false)
blobDriver.Run(fmt.Sprintf("unix:///tmp/csi-%s.sock", uuid.NewUUID().String()), false)
}()
})

Expand Down