Skip to content

[public-api] Refactor to use connect handlers #13692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions components/proxy/conf/Caddyfile
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,7 @@ api.{$GITPOD_DOMAIN} {
output stdout
}

@grpc protocol grpc

handle @grpc {
# gRPC traffic goes to gRPC server
reverse_proxy h2c://public-api-server.{$KUBE_NAMESPACE}.{$KUBE_DOMAIN}:9001
}

# Non-grpc traffic goes to an HTTP server
# All traffic goes to HTTP endpoint. We handle gRPC using connect.build
reverse_proxy public-api-server.{$KUBE_NAMESPACE}.{$KUBE_DOMAIN}:9002
}

Expand Down
17 changes: 9 additions & 8 deletions components/public-api-server/pkg/apiv1/prebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,24 @@ package apiv1

import (
"context"

"github.com/bufbuild/connect-go"
v1 "github.com/gitpod-io/gitpod/public-api/v1"
"github.com/gitpod-io/gitpod/public-api/v1/v1connect"
)

func NewPrebuildService() *PrebuildService {
return &PrebuildService{
UnimplementedPrebuildsServiceServer: &v1.UnimplementedPrebuildsServiceServer{},
}
return &PrebuildService{}
}

type PrebuildService struct {
*v1.UnimplementedPrebuildsServiceServer
v1connect.UnimplementedPrebuildsServiceHandler
}

func (p *PrebuildService) GetPrebuild(ctx context.Context, req *v1.GetPrebuildRequest) (*v1.GetPrebuildResponse, error) {
return &v1.GetPrebuildResponse{
func (p *PrebuildService) GetPrebuild(ctx context.Context, req *connect.Request[v1.GetPrebuildRequest]) (*connect.Response[v1.GetPrebuildResponse], error) {
return connect.NewResponse(&v1.GetPrebuildResponse{
Prebuild: &v1.Prebuild{
PrebuildId: req.GetPrebuildId(),
PrebuildId: req.Msg.GetPrebuildId(),
Spec: &v1.PrebuildSpec{
Context: &v1.WorkspaceContext{
ContextUrl: "https://github.com/gitpod-io/gitpod",
Expand All @@ -32,5 +33,5 @@ func (p *PrebuildService) GetPrebuild(ctx context.Context, req *v1.GetPrebuildRe
},
Status: nil,
},
}, nil
}), nil
}
10 changes: 6 additions & 4 deletions components/public-api-server/pkg/apiv1/prebuild_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ package apiv1

import (
"context"
"testing"

"github.com/bufbuild/connect-go"
v1 "github.com/gitpod-io/gitpod/public-api/v1"
"github.com/stretchr/testify/require"
"testing"
)

func TestPrebuildService_GetPrebuild(t *testing.T) {
svc := NewPrebuildService()

prebuildID := "some-prebuild-id"
resp, err := svc.GetPrebuild(context.Background(), &v1.GetPrebuildRequest{
resp, err := svc.GetPrebuild(context.Background(), connect.NewRequest(&v1.GetPrebuildRequest{
PrebuildId: prebuildID,
})
}))
require.NoError(t, err)
require.Equal(t, &v1.GetPrebuildResponse{
Prebuild: &v1.Prebuild{
Expand All @@ -31,6 +33,6 @@ func TestPrebuildService_GetPrebuild(t *testing.T) {
},
Status: nil,
},
}, resp)
}, resp.Msg)

}
110 changes: 34 additions & 76 deletions components/public-api-server/pkg/apiv1/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,60 +6,48 @@ package apiv1

import (
"context"
"fmt"

connect "github.com/bufbuild/connect-go"
protocol "github.com/gitpod-io/gitpod/gitpod-protocol"
"github.com/gitpod-io/gitpod/public-api-server/pkg/auth"
"github.com/gitpod-io/gitpod/public-api-server/pkg/proxy"
v1 "github.com/gitpod-io/gitpod/public-api/v1"
"github.com/gitpod-io/gitpod/public-api/v1/v1connect"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/relvacode/iso8601"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

func NewWorkspaceService(serverConnPool proxy.ServerConnectionPool) *WorkspaceService {
return &WorkspaceService{
connectionPool: serverConnPool,
UnimplementedWorkspacesServiceServer: &v1.UnimplementedWorkspacesServiceServer{},
connectionPool: serverConnPool,
}
}

type WorkspaceService struct {
connectionPool proxy.ServerConnectionPool

*v1.UnimplementedWorkspacesServiceServer
v1connect.UnimplementedWorkspacesServiceHandler
}

func (w *WorkspaceService) GetWorkspace(ctx context.Context, r *v1.GetWorkspaceRequest) (*v1.GetWorkspaceResponse, error) {
func (s *WorkspaceService) GetWorkspace(ctx context.Context, req *connect.Request[v1.GetWorkspaceRequest]) (*connect.Response[v1.GetWorkspaceResponse], error) {
token := auth.TokenFromContext(ctx)
logger := ctxlogrus.Extract(ctx)
token, err := bearerTokenFromContext(ctx)
if err != nil {
return nil, err
}

server, err := w.connectionPool.Get(ctx, token)
server, err := s.connectionPool.Get(ctx, token)
if err != nil {
logger.WithError(err).Error("Failed to get connection to server.")
return nil, status.Error(codes.Internal, "failed to establish connection to downstream services")
return nil, connect.NewError(connect.CodeInternal, err)
}

workspace, err := server.GetWorkspace(ctx, r.GetWorkspaceId())
workspace, err := server.GetWorkspace(ctx, req.Msg.GetWorkspaceId())
if err != nil {
logger.WithError(err).Error("Failed to get workspace.")
converted := proxy.ConvertError(err)
switch status.Code(converted) {
case codes.PermissionDenied:
return nil, status.Error(codes.PermissionDenied, "insufficient permission to access workspace")
case codes.NotFound:
return nil, status.Error(codes.NotFound, "workspace does not exist")
default:
return nil, status.Error(codes.Internal, "unable to retrieve workspace")
}
return nil, proxy.ConvertError(err)
}

return &v1.GetWorkspaceResponse{
return connect.NewResponse(&v1.GetWorkspaceResponse{
Result: &v1.Workspace{
WorkspaceId: workspace.Workspace.ID,
OwnerId: workspace.Workspace.OwnerID,
Expand All @@ -73,72 +61,40 @@ func (w *WorkspaceService) GetWorkspace(ctx context.Context, r *v1.GetWorkspaceR
},
Description: workspace.Workspace.Description,
},
}, nil
}), nil
}

func (w *WorkspaceService) GetOwnerToken(ctx context.Context, r *v1.GetOwnerTokenRequest) (*v1.GetOwnerTokenResponse, error) {
func (s *WorkspaceService) GetOwnerToken(ctx context.Context, req *connect.Request[v1.GetOwnerTokenRequest]) (*connect.Response[v1.GetOwnerTokenResponse], error) {
logger := ctxlogrus.Extract(ctx)
token, err := bearerTokenFromContext(ctx)
if err != nil {
return nil, err
}
token := auth.TokenFromContext(ctx)

server, err := w.connectionPool.Get(ctx, token)
server, err := s.connectionPool.Get(ctx, token)
if err != nil {
logger.WithError(err).Error("Failed to get connection to server.")
return nil, status.Error(codes.Internal, "failed to establish connection to downstream services")
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to establish connection to downstream services"))
}

ownerToken, err := server.GetOwnerToken(ctx, r.GetWorkspaceId())
ownerToken, err := server.GetOwnerToken(ctx, req.Msg.GetWorkspaceId())

if err != nil {
logger.WithError(err).Error("Failed to get owner token.")
converted := proxy.ConvertError(err)
switch status.Code(converted) {
case codes.PermissionDenied:
return nil, status.Error(codes.PermissionDenied, "insufficient permission to retrieve ownertoken")
case codes.NotFound:
return nil, status.Error(codes.NotFound, "workspace does not exist")
default:
return nil, status.Error(codes.Internal, "unable to retrieve owner token")
}
}

return &v1.GetOwnerTokenResponse{Token: ownerToken}, nil
}

func bearerTokenFromContext(ctx context.Context) (string, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", status.Error(codes.Unauthenticated, "no credentials provided")
}

values := md.Get("authorization")
if len(values) == 0 {
return "", status.Error(codes.Unauthenticated, "no authorization header specified")
}
if len(values) > 1 {
return "", status.Error(codes.Unauthenticated, "more than one authorization header specified, exactly one is required")
return nil, proxy.ConvertError(err)
}

token := values[0]
return token, nil
return connect.NewResponse(&v1.GetOwnerTokenResponse{Token: ownerToken}), nil
}

func (w *WorkspaceService) ListWorkspaces(ctx context.Context, req *v1.ListWorkspacesRequest) (*v1.ListWorkspacesResponse, error) {
func (s *WorkspaceService) ListWorkspaces(ctx context.Context, req *connect.Request[v1.ListWorkspacesRequest]) (*connect.Response[v1.ListWorkspacesResponse], error) {
logger := ctxlogrus.Extract(ctx)
token, err := bearerTokenFromContext(ctx)
if err != nil {
return nil, err
}
token := auth.TokenFromContext(ctx)

server, err := w.connectionPool.Get(ctx, token)
server, err := s.connectionPool.Get(ctx, token)
if err != nil {
logger.WithError(err).Error("Failed to get connection to server.")
return nil, status.Error(codes.Internal, "failed to establish connection to downstream services")
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to establish connection to downstream services"))
}

limit, err := getLimitFromPagination(req.Pagination)
limit, err := getLimitFromPagination(req.Msg.GetPagination())
if err != nil {
// getLimitFromPagination returns gRPC errors
return nil, err
Expand All @@ -161,9 +117,11 @@ func (w *WorkspaceService) ListWorkspaces(ctx context.Context, req *v1.ListWorks
res = append(res, workspaceAndInstance)
}

return &v1.ListWorkspacesResponse{
Result: res,
}, nil
return connect.NewResponse(
&v1.ListWorkspacesResponse{
Result: res,
},
), nil
}

func getLimitFromPagination(pagination *v1.Pagination) (int, error) {
Expand All @@ -179,7 +137,7 @@ func getLimitFromPagination(pagination *v1.Pagination) (int, error) {
return defaultLimit, nil
}
if pagination.PageSize < 0 || maxLimit < pagination.PageSize {
return 0, grpc.Errorf(codes.InvalidArgument, "invalid pagination page size (must be 0 < x < %d)", maxLimit)
return 0, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid pagination page size (must be 0 < x < %d)", maxLimit))
}

return int(pagination.PageSize), nil
Expand All @@ -192,7 +150,7 @@ func convertWorkspaceInfo(input *protocol.WorkspaceInfo) (*v1.ListWorkspacesResp
creationTime, err := parseGitpodTimestamp(wsi.CreationTime)
if err != nil {
// TODO(cw): should this really return an error and possibly fail the entire operation?
return nil, grpc.Errorf(codes.FailedPrecondition, "cannot parse creation time: %v", err)
return nil, connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("cannot parse creation time: %v", err))
}

var phase v1.WorkspaceInstanceStatus_Phase
Expand All @@ -219,7 +177,7 @@ func convertWorkspaceInfo(input *protocol.WorkspaceInfo) (*v1.ListWorkspacesResp
phase = v1.WorkspaceInstanceStatus_PHASE_STOPPED
default:
// TODO(cw): should this really return an error and possibly fail the entire operation?
return nil, grpc.Errorf(codes.FailedPrecondition, "cannot convert instance phase: %s", wsi.Status.Phase)
return nil, connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("cannot convert instance phase: %s", wsi.Status.Phase))
}

var admissionLevel v1.AdmissionLevel
Expand Down
Loading