Skip to content

Commit 70b2bb5

Browse files
committed
[ws-proxy] Use instance ID to not stomp on newer workspaces when handling events
1 parent 5d15d0c commit 70b2bb5

File tree

2 files changed

+821
-20
lines changed

2 files changed

+821
-20
lines changed

components/ws-proxy/pkg/proxy/infoprovider.go

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type WorkspaceInfo struct {
8080

8181
Ports []PortInfo
8282
Auth *wsapi.WorkspaceAuthentication
83+
Phase wsapi.WorkspacePhase
8384
}
8485

8586
// PortInfo contains all information ws-proxy needs to know about a workspace port
@@ -265,7 +266,7 @@ func (p *RemoteWorkspaceInfoProvider) listen(client wsapi.WorkspaceManagerClient
265266
}
266267

267268
if status.Phase == wsapi.WorkspacePhase_STOPPED {
268-
p.cache.Delete(status.Metadata.MetaId)
269+
p.cache.Delete(status.Metadata.MetaId, status.Id)
269270
} else {
270271
info := mapWorkspaceStatusToInfo(status)
271272
p.cache.Insert(info)
@@ -309,10 +310,11 @@ func mapWorkspaceStatusToInfo(status *wsapi.WorkspaceStatus) *WorkspaceInfo {
309310
IDEPublicPort: getPortStr(status.Spec.Url),
310311
Ports: portInfos,
311312
Auth: status.Auth,
313+
Phase: status.Phase,
312314
}
313315
}
314316

315-
// WorkspaceInfo return the WorkspaceInfo avaiable for the given workspaceID.
317+
// WorkspaceInfo return the WorkspaceInfo available for the given workspaceID.
316318
// Callers should make sure their context gets canceled properly. For good measure
317319
// this function will timeout by itself as well.
318320
func (p *RemoteWorkspaceInfoProvider) WorkspaceInfo(ctx context.Context, workspaceID string) *WorkspaceInfo {
@@ -355,10 +357,14 @@ func getPortStr(urlStr string) string {
355357
return portURL.Port()
356358
}
357359

360+
// TODO(rl): type def workspaceID and instanceID throughout for better readability and type safety?
361+
type workspaceInfosByInstance map[string]*WorkspaceInfo
362+
type instanceInfosByWorkspace map[string]workspaceInfosByInstance
363+
358364
// workspaceInfoCache stores WorkspaceInfo in a manner which is easy to query for WorkspaceInfoProvider
359365
type workspaceInfoCache struct {
360-
// WorkspaceInfos indexed by workspaceID
361-
infos map[string]*WorkspaceInfo
366+
// WorkspaceInfos indexed by workspaceID and then instanceID
367+
infos instanceInfosByWorkspace
362368
// WorkspaceCoords indexed by public (proxy) port (string)
363369
coordsByPublicPort map[string]*WorkspaceCoords
364370

@@ -371,7 +377,7 @@ type workspaceInfoCache struct {
371377
func newWorkspaceInfoCache() *workspaceInfoCache {
372378
var mu sync.RWMutex
373379
return &workspaceInfoCache{
374-
infos: make(map[string]*WorkspaceInfo),
380+
infos: make(map[string]workspaceInfosByInstance),
375381
coordsByPublicPort: make(map[string]*WorkspaceCoords),
376382
mu: &mu,
377383
cond: sync.NewCond(&mu),
@@ -382,7 +388,7 @@ func (c *workspaceInfoCache) Reinit(infos []*WorkspaceInfo) {
382388
c.cond.L.Lock()
383389
defer c.cond.L.Unlock()
384390

385-
c.infos = make(map[string]*WorkspaceInfo, len(infos))
391+
c.infos = make(map[string]workspaceInfosByInstance, len(infos))
386392
c.coordsByPublicPort = make(map[string]*WorkspaceCoords, len(c.coordsByPublicPort))
387393

388394
for _, info := range infos {
@@ -400,7 +406,15 @@ func (c *workspaceInfoCache) Insert(info *WorkspaceInfo) {
400406
}
401407

402408
func (c *workspaceInfoCache) doInsert(info *WorkspaceInfo) {
403-
c.infos[info.WorkspaceID] = info
409+
existingInfos, ok := c.infos[info.WorkspaceID]
410+
if !ok {
411+
existingInfos = make(map[string]*WorkspaceInfo)
412+
}
413+
existingInfos[info.InstanceID] = info
414+
c.infos[info.WorkspaceID] = existingInfos
415+
416+
// NOTE: in the unlikely event that a subsequent insert changes the port
417+
// then we add it here assuming that the delete will remove it
404418
c.coordsByPublicPort[info.IDEPublicPort] = &WorkspaceCoords{
405419
ID: info.WorkspaceID,
406420
}
@@ -413,25 +427,63 @@ func (c *workspaceInfoCache) doInsert(info *WorkspaceInfo) {
413427
}
414428
}
415429

416-
func (c *workspaceInfoCache) Delete(workspaceID string) {
430+
func (c *workspaceInfoCache) Delete(workspaceID string, instanceID string) {
417431
c.cond.L.Lock()
418432
defer c.cond.L.Unlock()
419433

420-
info, present := c.infos[workspaceID]
421-
if !present || info == nil {
434+
infos, present := c.infos[workspaceID]
435+
if !present || infos == nil {
422436
return
423437
}
424-
delete(c.coordsByPublicPort, info.IDEPublicPort)
425-
delete(c.infos, workspaceID)
438+
439+
// Keep only the active instances and public port(s) for the workspace id
440+
var instanceIDEPublicPort string
441+
if info, ok := infos[instanceID]; ok {
442+
delete(infos, instanceID)
443+
instanceIDEPublicPort = info.IDEPublicPort
444+
}
445+
446+
if len(infos) == 0 {
447+
delete(c.infos, workspaceID)
448+
} else {
449+
c.infos[workspaceID] = infos
450+
}
451+
452+
// Clean up the public port if port no longer active
453+
activePublicPorts := make(map[string]interface{})
454+
for _, info := range infos {
455+
activePublicPorts[info.IDEPublicPort] = true
456+
}
457+
458+
if _, ok := activePublicPorts[instanceIDEPublicPort]; !ok {
459+
log.WithField("workspaceID", workspaceID).WithField("instanceID", instanceID).WithField("port", instanceIDEPublicPort).Debug("deleting port for workspace")
460+
delete(c.coordsByPublicPort, instanceIDEPublicPort)
461+
}
426462
}
427463

428464
// WaitFor waits for workspace info until that info is available or the context is canceled.
429465
func (c *workspaceInfoCache) WaitFor(ctx context.Context, workspaceID string) (w *WorkspaceInfo, ok bool) {
430466
c.mu.RLock()
431-
w, ok = c.infos[workspaceID]
467+
existing, ok := c.infos[workspaceID]
432468
c.mu.RUnlock()
469+
470+
getCandidate := func(infos map[string]*WorkspaceInfo) *WorkspaceInfo {
471+
// Find the *best* candidate i.e. prefer any instance running over stopping/stopped
472+
// If there are >1 instances in running state it will pick the first one
473+
// TODO(rl): make this a smarter container? i.e. priorised by (smart) status
474+
var candidate *WorkspaceInfo
475+
for _, info := range infos {
476+
if candidate == nil || (info.Phase == wsapi.WorkspacePhase_RUNNING && candidate.Phase != info.Phase) {
477+
candidate = info
478+
} else if candidate.Phase < info.Phase {
479+
candidate = info
480+
}
481+
}
482+
return candidate
483+
}
484+
433485
if ok {
434-
return
486+
return getCandidate(existing), true
435487
}
436488

437489
inc := make(chan *WorkspaceInfo)
@@ -446,12 +498,12 @@ func (c *workspaceInfoCache) WaitFor(ctx context.Context, workspaceID string) (w
446498
return
447499
}
448500

449-
info, ok := c.infos[workspaceID]
501+
infos, ok := c.infos[workspaceID]
450502
if !ok {
451503
continue
452504
}
453505

454-
inc <- info
506+
inc <- getCandidate(infos)
455507
return
456508
}
457509
}()

0 commit comments

Comments
 (0)