From a8c3602bf56c018adc57a6f6f1c1692ad17525e7 Mon Sep 17 00:00:00 2001 From: Huiwen Date: Mon, 26 May 2025 06:17:41 +0000 Subject: [PATCH 1/4] [supervisor] fix ports forwarding hangs issue --- .../supervisor/pkg/ports/exposed-ports.go | 6 +++--- components/supervisor/pkg/ports/ports.go | 20 ++++++++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/components/supervisor/pkg/ports/exposed-ports.go b/components/supervisor/pkg/ports/exposed-ports.go index 4cd0972044e11e..79bb4ddfefd9ee 100644 --- a/components/supervisor/pkg/ports/exposed-ports.go +++ b/components/supervisor/pkg/ports/exposed-ports.go @@ -87,9 +87,9 @@ func NewGitpodExposedPorts(workspaceID string, instanceID string, workspaceUrl s WorkspaceUrl: workspaceUrl, gitpodService: gitpodService, - // allow clients to submit 30 expose requests without blocking - requests: make(chan *exposePortRequest, 30), - localExposedNotice: make(chan struct{}, 30), + // allow clients to submit 3000 expose requests without blocking + requests: make(chan *exposePortRequest, 3000), + localExposedNotice: make(chan struct{}, 3000), } } diff --git a/components/supervisor/pkg/ports/ports.go b/components/supervisor/pkg/ports/ports.go index 1b36c4101f4339..8f1d87b8fe9cd1 100644 --- a/components/supervisor/pkg/ports/ports.go +++ b/components/supervisor/pkg/ports/ports.go @@ -45,7 +45,8 @@ func NewManager(exposed ExposedPortsInterface, served ServedPortsObserver, confi C: config, T: tunneled, - forceUpdates: make(chan struct{}, 1), + forceUpdates: make(chan struct{}, 1), + closeSubscriptions: make(chan *Subscription, maxSubscriptions), internal: internal, proxies: make(map[uint32]*localhostProxy), @@ -80,7 +81,8 @@ type Manager struct { C ConfigInterace T TunneledPortsInterface - forceUpdates chan struct{} + forceUpdates chan struct{} + closeSubscriptions chan *Subscription internal map[uint32]struct{} proxies map[uint32]*localhostProxy @@ -172,6 +174,10 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { select { case <-pm.forceUpdates: forceUpdate = true + case sub := <-pm.closeSubscriptions: + pm.mu.Lock() + delete(pm.subscriptions, sub) + pm.mu.Unlock() case exposed = <-exposedUpdates: if exposed == nil { if ctx.Err() == nil { @@ -772,14 +778,14 @@ func (pm *Manager) Subscribe() (*Subscription, error) { sub := &Subscription{updates: make(chan []*api.PortsStatus, 5)} var once sync.Once sub.Close = func() error { - pm.mu.Lock() - defer pm.mu.Unlock() - once.Do(func() { close(sub.updates) }) - delete(pm.subscriptions, sub) - + select { + case pm.closeSubscriptions <- sub: + default: + log.Error("closeSubscriptions channel is full") + } return nil } pm.subscriptions[sub] = struct{}{} From 81ecbce6aa8413876a78eb5a81ab453806be847a Mon Sep 17 00:00:00 2001 From: Huiwen Date: Mon, 26 May 2025 06:39:44 +0000 Subject: [PATCH 2/4] fix unit tests --- components/supervisor/pkg/ports/ports.go | 5 +++-- components/supervisor/pkg/ports/ports_test.go | 11 ++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/components/supervisor/pkg/ports/ports.go b/components/supervisor/pkg/ports/ports.go index 8f1d87b8fe9cd1..c9ce463299fac2 100644 --- a/components/supervisor/pkg/ports/ports.go +++ b/components/supervisor/pkg/ports/ports.go @@ -46,7 +46,7 @@ func NewManager(exposed ExposedPortsInterface, served ServedPortsObserver, confi T: tunneled, forceUpdates: make(chan struct{}, 1), - closeSubscriptions: make(chan *Subscription, maxSubscriptions), + closeSubscriptions: make(chan *Subscription, maxSubscriptions*2), internal: internal, proxies: make(map[uint32]*localhostProxy), @@ -772,7 +772,8 @@ func (pm *Manager) Subscribe() (*Subscription, error) { } if len(pm.subscriptions) > maxSubscriptions { - return nil, ErrTooManySubscriptions + return nil, fmt.Errorf("too many subscriptions: %d", len(pm.subscriptions)) + // return nil, ErrTooManySubscriptions } sub := &Subscription{updates: make(chan []*api.PortsStatus, 5)} diff --git a/components/supervisor/pkg/ports/ports_test.go b/components/supervisor/pkg/ports/ports_test.go index 27b05ae4d54cd5..26154bca3bf4a6 100644 --- a/components/supervisor/pkg/ports/ports_test.go +++ b/components/supervisor/pkg/ports/ports_test.go @@ -864,8 +864,8 @@ func TestPortsConcurrentSubscribe(t *testing.T) { } }() - eg, _ := errgroup.WithContext(context.Background()) for i := 0; i < maxSubscriptions; i++ { + eg, _ := errgroup.WithContext(context.Background()) eg.Go(func() error { for j := 0; j < subscribes; j++ { sub, err := pm.Subscribe() @@ -882,12 +882,13 @@ func TestPortsConcurrentSubscribe(t *testing.T) { } return nil }) + err := eg.Wait() + if err != nil { + t.Fatal(err) + } + time.Sleep(50 * time.Millisecond) } - err := eg.Wait() close(subscribing) - if err != nil { - t.Fatal(err) - } wg.Wait() } From a720018df380cf5a420fda41f904f9e2e73a5590 Mon Sep 17 00:00:00 2001 From: Huiwen Date: Mon, 26 May 2025 07:07:59 +0000 Subject: [PATCH 3/4] Fix lock issue --- components/supervisor/pkg/ports/ports.go | 22 ++++++++----------- components/supervisor/pkg/ports/ports_test.go | 4 ++-- .../supervisor/pkg/supervisor/services.go | 2 +- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/components/supervisor/pkg/ports/ports.go b/components/supervisor/pkg/ports/ports.go index c9ce463299fac2..31d358a82e2617 100644 --- a/components/supervisor/pkg/ports/ports.go +++ b/components/supervisor/pkg/ports/ports.go @@ -127,7 +127,7 @@ type managedPort struct { // Subscription is a Subscription to status updates type Subscription struct { updates chan []*api.PortsStatus - Close func() error + Close func(lock bool) error } // Updates returns the updates channel @@ -153,7 +153,7 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { pm.mu.Unlock() for _, s := range subs { - _ = s.Close() + _ = s.Close(true) } }() defer cancel() @@ -174,10 +174,6 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { select { case <-pm.forceUpdates: forceUpdate = true - case sub := <-pm.closeSubscriptions: - pm.mu.Lock() - delete(pm.subscriptions, sub) - pm.mu.Unlock() case exposed = <-exposedUpdates: if exposed == nil { if ctx.Err() == nil { @@ -330,7 +326,7 @@ func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, serve case sub.updates <- status: case <-time.After(5 * time.Second): log.Error("ports subscription droped out") - _ = sub.Close() + _ = sub.Close(false) } } } @@ -778,15 +774,15 @@ func (pm *Manager) Subscribe() (*Subscription, error) { sub := &Subscription{updates: make(chan []*api.PortsStatus, 5)} var once sync.Once - sub.Close = func() error { + sub.Close = func(lock bool) error { + if lock { + pm.mu.Lock() + defer pm.mu.Unlock() + } once.Do(func() { close(sub.updates) }) - select { - case pm.closeSubscriptions <- sub: - default: - log.Error("closeSubscriptions channel is full") - } + delete(pm.subscriptions, sub) return nil } pm.subscriptions[sub] = struct{}{} diff --git a/components/supervisor/pkg/ports/ports_test.go b/components/supervisor/pkg/ports/ports_test.go index 26154bca3bf4a6..cb6ee407371621 100644 --- a/components/supervisor/pkg/ports/ports_test.go +++ b/components/supervisor/pkg/ports/ports_test.go @@ -684,7 +684,7 @@ func TestPortsUpdateState(t *testing.T) { } go func() { defer wg.Done() - defer sub.Close() + defer sub.Close(true) for up := range sub.Updates() { updts = append(updts, up) @@ -878,7 +878,7 @@ func TestPortsConcurrentSubscribe(t *testing.T) { // update case <-sub.Updates(): } - sub.Close() + sub.Close(true) } return nil }) diff --git a/components/supervisor/pkg/supervisor/services.go b/components/supervisor/pkg/supervisor/services.go index f701b15fd3b4bd..025eb6dfc44bdf 100644 --- a/components/supervisor/pkg/supervisor/services.go +++ b/components/supervisor/pkg/supervisor/services.go @@ -243,7 +243,7 @@ func (s *statusService) PortsStatus(req *api.PortsStatusRequest, srv api.StatusS if err != nil { return status.Error(codes.Internal, err.Error()) } - defer sub.Close() + defer sub.Close(true) for { select { From 4a87c0643aa008266a99871bbdabefcfad1ca2e9 Mon Sep 17 00:00:00 2001 From: Huiwen Date: Mon, 26 May 2025 19:02:27 +0000 Subject: [PATCH 4/4] remove useless code --- components/supervisor/pkg/ports/ports.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/components/supervisor/pkg/ports/ports.go b/components/supervisor/pkg/ports/ports.go index 31d358a82e2617..b9928f683da531 100644 --- a/components/supervisor/pkg/ports/ports.go +++ b/components/supervisor/pkg/ports/ports.go @@ -45,8 +45,7 @@ func NewManager(exposed ExposedPortsInterface, served ServedPortsObserver, confi C: config, T: tunneled, - forceUpdates: make(chan struct{}, 1), - closeSubscriptions: make(chan *Subscription, maxSubscriptions*2), + forceUpdates: make(chan struct{}, 1), internal: internal, proxies: make(map[uint32]*localhostProxy), @@ -81,8 +80,7 @@ type Manager struct { C ConfigInterace T TunneledPortsInterface - forceUpdates chan struct{} - closeSubscriptions chan *Subscription + forceUpdates chan struct{} internal map[uint32]struct{} proxies map[uint32]*localhostProxy