Skip to content

Commit 20b7b5a

Browse files
authored
Merge pull request #1748 from mythi/PR-2024-013
pkg/deviceplugin: move to grpc.NewClient()
2 parents 11c9753 + 4d858c5 commit 20b7b5a

File tree

3 files changed

+35
-33
lines changed

3 files changed

+35
-33
lines changed

cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (w *mockPodResources) Get(ctx context.Context,
9292
}
9393

9494
func newMockResourceManager(pods []v1.Pod) ResourceManager {
95-
client, err := grpc.Dial("fake", grpc.WithTransportCredentials(insecure.NewCredentials()))
95+
client, err := grpc.NewClient("fake", grpc.WithTransportCredentials(insecure.NewCredentials()))
9696
if err != nil {
9797
fmt.Fprintf(os.Stderr, "failed to create client: %v\n", err)
9898

pkg/deviceplugin/server.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/fsnotify/fsnotify"
2929
"github.com/pkg/errors"
3030
"google.golang.org/grpc"
31+
"google.golang.org/grpc/connectivity"
3132
"google.golang.org/grpc/credentials/insecure"
3233

3334
"k8s.io/klog/v2"
@@ -371,15 +372,9 @@ func watchFile(file string) error {
371372
}
372373

373374
func (srv *server) registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) error {
374-
ctx := context.Background()
375-
376-
conn, err := grpc.DialContext(ctx, kubeletSocket,
377-
grpc.WithTransportCredentials(insecure.NewCredentials()),
378-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
379-
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
380-
}))
375+
conn, err := grpc.NewClient(filepath.Join("unix://", kubeletSocket), grpc.WithTransportCredentials(insecure.NewCredentials()))
381376
if err != nil {
382-
return errors.Wrap(err, "Cannot connect to kubelet service")
377+
return errors.Wrap(err, "Cannot create a gRPC client")
383378
}
384379

385380
defer conn.Close()
@@ -392,7 +387,7 @@ func (srv *server) registerWithKubelet(kubeletSocket, pluginEndPoint, resourceNa
392387
Options: srv.getDevicePluginOptions(),
393388
}
394389

395-
_, err = client.Register(ctx, reqt)
390+
_, err = client.Register(context.Background(), reqt)
396391
if err != nil {
397392
return errors.Wrap(err, "Cannot register to kubelet service")
398393
}
@@ -403,20 +398,33 @@ func (srv *server) registerWithKubelet(kubeletSocket, pluginEndPoint, resourceNa
403398
// waitForServer checks if grpc server is alive
404399
// by making grpc blocking connection to the server socket.
405400
func waitForServer(socket string, timeout time.Duration) error {
401+
conn, err := grpc.NewClient(filepath.Join("unix://", socket), grpc.WithTransportCredentials(insecure.NewCredentials()))
402+
if err != nil {
403+
return errors.Wrap(err, "Cannot create a gRPC client")
404+
}
405+
406+
defer conn.Close()
407+
406408
ctx, cancel := context.WithTimeout(context.Background(), timeout)
407409

408410
defer cancel()
409411

410-
conn, err := grpc.DialContext(ctx, socket,
411-
grpc.WithTransportCredentials(insecure.NewCredentials()),
412-
grpc.WithBlock(),
413-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
414-
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
415-
}),
416-
)
417-
if conn != nil {
418-
_ = conn.Close()
419-
}
412+
// A blocking dial blocks until the clientConn is ready. Based
413+
// on grpc-go's DialContext() that moved to use NewClient() but
414+
// marked DialContext() deprecated.
415+
for {
416+
state := conn.GetState()
417+
if state == connectivity.Idle {
418+
conn.Connect()
419+
}
420+
421+
if state == connectivity.Ready {
422+
return nil
423+
}
420424

421-
return errors.Wrapf(err, "Failed dial context at %s", socket)
425+
if !conn.WaitForStateChange(ctx, state) {
426+
// ctx got timeout or canceled.
427+
return errors.Wrapf(ctx.Err(), "Failed dial context at %s", socket)
428+
}
429+
}
422430
}

pkg/deviceplugin/server_test.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"net"
2222
"os"
2323
"path"
24+
"path/filepath"
2425
"reflect"
2526
"sync"
2627
"testing"
@@ -111,7 +112,7 @@ func (k *kubeletStub) start() error {
111112
return waitForServer(k.socket, 10*time.Second)
112113
}
113114

114-
func TestRegisterWithKublet(t *testing.T) {
115+
func TestRegisterWithKubelet(t *testing.T) {
115116
pluginSocket := path.Join(devicePluginPath, pluginEndpoint)
116117

117118
srv := newTestServer()
@@ -180,11 +181,8 @@ func TestSetupAndServe(t *testing.T) {
180181

181182
ctx := context.Background()
182183

183-
conn, err := grpc.DialContext(ctx, pluginSocket,
184-
grpc.WithTransportCredentials(insecure.NewCredentials()),
185-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
186-
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
187-
}))
184+
conn, err := grpc.NewClient(filepath.Join("unix://", pluginSocket),
185+
grpc.WithTransportCredentials(insecure.NewCredentials()))
188186
if err != nil {
189187
t.Fatalf("Failed to get connection: %+v", err)
190188
}
@@ -231,12 +229,8 @@ func TestSetupAndServe(t *testing.T) {
231229
time.Sleep(1 * time.Second)
232230
}
233231

234-
conn, err = grpc.DialContext(ctx, pluginSocket,
235-
grpc.WithTransportCredentials(insecure.NewCredentials()),
236-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
237-
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
238-
}))
239-
232+
conn, err = grpc.NewClient(filepath.Join("unix://", pluginSocket),
233+
grpc.WithTransportCredentials(insecure.NewCredentials()))
240234
if err != nil {
241235
t.Fatalf("Failed to get connection: %+v", err)
242236
}

0 commit comments

Comments
 (0)