diff --git a/components/supervisor/pkg/ports/exposed-ports.go b/components/supervisor/pkg/ports/exposed-ports.go index 4cd0972044e11e..79bb4ddfefd9ee 100644 --- a/components/supervisor/pkg/ports/exposed-ports.go +++ b/components/supervisor/pkg/ports/exposed-ports.go @@ -87,9 +87,9 @@ func NewGitpodExposedPorts(workspaceID string, instanceID string, workspaceUrl s WorkspaceUrl: workspaceUrl, gitpodService: gitpodService, - // allow clients to submit 30 expose requests without blocking - requests: make(chan *exposePortRequest, 30), - localExposedNotice: make(chan struct{}, 30), + // allow clients to submit 3000 expose requests without blocking + requests: make(chan *exposePortRequest, 3000), + localExposedNotice: make(chan struct{}, 3000), } } diff --git a/components/supervisor/pkg/ports/ports.go b/components/supervisor/pkg/ports/ports.go index 1b36c4101f4339..b9928f683da531 100644 --- a/components/supervisor/pkg/ports/ports.go +++ b/components/supervisor/pkg/ports/ports.go @@ -125,7 +125,7 @@ type managedPort struct { // Subscription is a Subscription to status updates type Subscription struct { updates chan []*api.PortsStatus - Close func() error + Close func(lock bool) error } // Updates returns the updates channel @@ -151,7 +151,7 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { pm.mu.Unlock() for _, s := range subs { - _ = s.Close() + _ = s.Close(true) } }() defer cancel() @@ -324,7 +324,7 @@ func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, serve case sub.updates <- status: case <-time.After(5 * time.Second): log.Error("ports subscription droped out") - _ = sub.Close() + _ = sub.Close(false) } } } @@ -766,20 +766,21 @@ func (pm *Manager) Subscribe() (*Subscription, error) { } if len(pm.subscriptions) > maxSubscriptions { - return nil, ErrTooManySubscriptions + return nil, fmt.Errorf("too many subscriptions: %d", len(pm.subscriptions)) + // return nil, ErrTooManySubscriptions } sub := &Subscription{updates: make(chan []*api.PortsStatus, 5)} var once sync.Once - sub.Close = func() error { - pm.mu.Lock() - defer pm.mu.Unlock() - + sub.Close = func(lock bool) error { + if lock { + pm.mu.Lock() + defer pm.mu.Unlock() + } once.Do(func() { close(sub.updates) }) delete(pm.subscriptions, sub) - return nil } pm.subscriptions[sub] = struct{}{} diff --git a/components/supervisor/pkg/ports/ports_test.go b/components/supervisor/pkg/ports/ports_test.go index 27b05ae4d54cd5..cb6ee407371621 100644 --- a/components/supervisor/pkg/ports/ports_test.go +++ b/components/supervisor/pkg/ports/ports_test.go @@ -684,7 +684,7 @@ func TestPortsUpdateState(t *testing.T) { } go func() { defer wg.Done() - defer sub.Close() + defer sub.Close(true) for up := range sub.Updates() { updts = append(updts, up) @@ -864,8 +864,8 @@ func TestPortsConcurrentSubscribe(t *testing.T) { } }() - eg, _ := errgroup.WithContext(context.Background()) for i := 0; i < maxSubscriptions; i++ { + eg, _ := errgroup.WithContext(context.Background()) eg.Go(func() error { for j := 0; j < subscribes; j++ { sub, err := pm.Subscribe() @@ -878,16 +878,17 @@ func TestPortsConcurrentSubscribe(t *testing.T) { // update case <-sub.Updates(): } - sub.Close() + sub.Close(true) } return nil }) + err := eg.Wait() + if err != nil { + t.Fatal(err) + } + time.Sleep(50 * time.Millisecond) } - err := eg.Wait() close(subscribing) - if err != nil { - t.Fatal(err) - } wg.Wait() } diff --git a/components/supervisor/pkg/supervisor/services.go b/components/supervisor/pkg/supervisor/services.go index f701b15fd3b4bd..025eb6dfc44bdf 100644 --- a/components/supervisor/pkg/supervisor/services.go +++ b/components/supervisor/pkg/supervisor/services.go @@ -243,7 +243,7 @@ func (s *statusService) PortsStatus(req *api.PortsStatusRequest, srv api.StatusS if err != nil { return status.Error(codes.Internal, err.Error()) } - defer sub.Close() + defer sub.Close(true) for { select {