@@ -78,9 +78,9 @@ type WorkspaceInfo struct {
78
78
// (parsed from URL)
79
79
IDEPublicPort string
80
80
81
- Ports []PortInfo
82
- Auth * wsapi.WorkspaceAuthentication
83
- StartedAt time. Time
81
+ Ports []PortInfo
82
+ Auth * wsapi.WorkspaceAuthentication
83
+ Phase wsapi. WorkspacePhase
84
84
}
85
85
86
86
// PortInfo contains all information ws-proxy needs to know about a workspace port
@@ -266,7 +266,7 @@ func (p *RemoteWorkspaceInfoProvider) listen(client wsapi.WorkspaceManagerClient
266
266
}
267
267
268
268
if status .Phase == wsapi .WorkspacePhase_STOPPED {
269
- p .cache .Delete (status .Metadata .MetaId , status .Metadata . StartedAt . AsTime () )
269
+ p .cache .Delete (status .Metadata .MetaId , status .Id )
270
270
} else {
271
271
info := mapWorkspaceStatusToInfo (status )
272
272
p .cache .Insert (info )
@@ -310,11 +310,11 @@ func mapWorkspaceStatusToInfo(status *wsapi.WorkspaceStatus) *WorkspaceInfo {
310
310
IDEPublicPort : getPortStr (status .Spec .Url ),
311
311
Ports : portInfos ,
312
312
Auth : status .Auth ,
313
- StartedAt : status .Metadata . StartedAt . AsTime () ,
313
+ Phase : status .Phase ,
314
314
}
315
315
}
316
316
317
- // WorkspaceInfo return the WorkspaceInfo avaiable for the given workspaceID.
317
+ // WorkspaceInfo return the WorkspaceInfo available for the given workspaceID.
318
318
// Callers should make sure their context gets canceled properly. For good measure
319
319
// this function will timeout by itself as well.
320
320
func (p * RemoteWorkspaceInfoProvider ) WorkspaceInfo (ctx context.Context , workspaceID string ) * WorkspaceInfo {
@@ -360,7 +360,7 @@ func getPortStr(urlStr string) string {
360
360
// workspaceInfoCache stores WorkspaceInfo in a manner which is easy to query for WorkspaceInfoProvider
361
361
type workspaceInfoCache struct {
362
362
// WorkspaceInfos indexed by workspaceID
363
- infos map [string ]* WorkspaceInfo
363
+ infos map [string ][] * WorkspaceInfo
364
364
// WorkspaceCoords indexed by public (proxy) port (string)
365
365
coordsByPublicPort map [string ]* WorkspaceCoords
366
366
@@ -373,7 +373,7 @@ type workspaceInfoCache struct {
373
373
func newWorkspaceInfoCache () * workspaceInfoCache {
374
374
var mu sync.RWMutex
375
375
return & workspaceInfoCache {
376
- infos : make (map [string ]* WorkspaceInfo ),
376
+ infos : make (map [string ][] * WorkspaceInfo ),
377
377
coordsByPublicPort : make (map [string ]* WorkspaceCoords ),
378
378
mu : & mu ,
379
379
cond : sync .NewCond (& mu ),
@@ -384,7 +384,7 @@ func (c *workspaceInfoCache) Reinit(infos []*WorkspaceInfo) {
384
384
c .cond .L .Lock ()
385
385
defer c .cond .L .Unlock ()
386
386
387
- c .infos = make (map [string ]* WorkspaceInfo , len (infos ))
387
+ c .infos = make (map [string ][] * WorkspaceInfo , len (infos ))
388
388
c .coordsByPublicPort = make (map [string ]* WorkspaceCoords , len (c .coordsByPublicPort ))
389
389
390
390
for _ , info := range infos {
@@ -402,19 +402,17 @@ func (c *workspaceInfoCache) Insert(info *WorkspaceInfo) {
402
402
}
403
403
404
404
func (c * workspaceInfoCache ) doInsert (info * WorkspaceInfo ) {
405
- existing := c .infos [info .WorkspaceID ]
406
- if existing != nil {
407
- // Do not insert if the current workspace is newer
408
- // This works around issues when a workspace is deleted then restarted in quick succession
409
- // and the stopping event occurs after the replocement pod is started
410
- if ! existing .StartedAt .IsZero () && ! info .StartedAt .IsZero () {
411
- if existing .StartedAt .After (info .StartedAt ) {
412
- log .WithField ("workspaceID" , existing .WorkspaceID ).Debug ("ignoring insert of older workspace" )
413
- return
414
- }
405
+ existingInfos := c .infos [info .WorkspaceID ]
406
+ // Only insert if this is a new instance
407
+ for i , existingInfo := range existingInfos {
408
+ if existingInfo .InstanceID == info .InstanceID {
409
+ c.infos [info.WorkspaceID ][i ] = info
410
+ log .WithField ("info" , info ).Debug ("ignoring insert of existing instance for workspace" )
411
+ return
415
412
}
416
413
}
417
- c .infos [info .WorkspaceID ] = info
414
+ log .WithField ("workspaceID" , info .WorkspaceID ).WithField ("instanceID" , info .InstanceID ).Debug ("adding instance for workspace" )
415
+ c .infos [info .WorkspaceID ] = append (c .infos [info .WorkspaceID ], info )
418
416
c .coordsByPublicPort [info .IDEPublicPort ] = & WorkspaceCoords {
419
417
ID : info .WorkspaceID ,
420
418
}
@@ -427,34 +425,57 @@ func (c *workspaceInfoCache) doInsert(info *WorkspaceInfo) {
427
425
}
428
426
}
429
427
430
- func (c * workspaceInfoCache ) Delete (workspaceID string , startedAt time. Time ) {
428
+ func (c * workspaceInfoCache ) Delete (workspaceID string , instanceID string ) {
431
429
c .cond .L .Lock ()
432
430
defer c .cond .L .Unlock ()
433
431
434
- info , present := c .infos [workspaceID ]
435
- if ! present || info == nil {
432
+ infos , present := c .infos [workspaceID ]
433
+ if ! present || infos == nil {
436
434
return
437
435
}
438
- // Do not delete if the current workspace info is newer
439
- // This works around issues when a workspace is deleted then restarted in quick succession
440
- // and the delete event occurs after the replocement pod is started
441
- if ! startedAt . IsZero () && ! info . StartedAt . IsZero () {
442
- if info . StartedAt . After ( startedAt ) {
443
- log . WithField ( "workspaceID" , workspaceID ). WithField ( "startedAt" , startedAt ). WithField ( " info" , info ). Debug ( "ignoring delete of older workspace" )
436
+ // Delete only the matching instance ID
437
+ for _ , info := range infos {
438
+ if info . InstanceID == instanceID {
439
+ log . WithField ( "workspaceID" , workspaceID ). WithField ( "instanceID" , instanceID ). Debug ( "deleting instance for workspace" )
440
+ delete ( c . infos , instanceID )
441
+ delete ( c . coordsByPublicPort , info . IDEPublicPort )
444
442
return
445
443
}
446
444
}
447
- delete (c .coordsByPublicPort , info .IDEPublicPort )
448
- delete (c .infos , workspaceID )
445
+ log .WithField ("workspaceID" , workspaceID ).WithField ("instanceID" , instanceID ).Debug ("ignoring delete of missing instance for workspace" )
449
446
}
450
447
451
448
// WaitFor waits for workspace info until that info is available or the context is canceled.
452
449
func (c * workspaceInfoCache ) WaitFor (ctx context.Context , workspaceID string ) (w * WorkspaceInfo , ok bool ) {
453
450
c .mu .RLock ()
454
- w , ok = c .infos [workspaceID ]
451
+ existing , ok : = c .infos [workspaceID ]
455
452
c .mu .RUnlock ()
453
+
454
+ getCandidate := func (infos []* WorkspaceInfo ) * WorkspaceInfo {
455
+ // Bail early...
456
+ if len (infos ) == 1 {
457
+ return infos [0 ]
458
+ }
459
+ // Find the *best* candidate i.e. prefer any instance ramping up over stopping/stopped
460
+ // TODO(rl): make this a smarter container? i.e. priorised by (smart) status
461
+ candidate := infos [0 ]
462
+ for _ , info := range infos [1 :] {
463
+ if info .Phase < wsapi .WorkspacePhase_INTERRUPTED {
464
+ if candidate .Phase > info .Phase {
465
+ candidate = info
466
+ }
467
+ } else {
468
+ // Prefer interrupted
469
+ if candidate .Phase < info .Phase {
470
+ candidate = info
471
+ }
472
+ }
473
+ }
474
+ return candidate
475
+ }
476
+
456
477
if ok {
457
- return
478
+ return getCandidate ( existing ), true
458
479
}
459
480
460
481
inc := make (chan * WorkspaceInfo )
@@ -469,12 +490,12 @@ func (c *workspaceInfoCache) WaitFor(ctx context.Context, workspaceID string) (w
469
490
return
470
491
}
471
492
472
- info , ok := c .infos [workspaceID ]
493
+ infos , ok := c .infos [workspaceID ]
473
494
if ! ok {
474
495
continue
475
496
}
476
497
477
- inc <- info
498
+ inc <- getCandidate ( infos )
478
499
return
479
500
}
480
501
}()
0 commit comments