@@ -78,8 +78,10 @@ type WorkspaceInfo struct {
78
78
// (parsed from URL)
79
79
IDEPublicPort string
80
80
81
- Ports []PortInfo
82
- Auth * wsapi.WorkspaceAuthentication
81
+ Ports []PortInfo
82
+ Auth * wsapi.WorkspaceAuthentication
83
+ Phase wsapi.WorkspacePhase
84
+ StartedAt time.Time
83
85
}
84
86
85
87
// PortInfo contains all information ws-proxy needs to know about a workspace port
@@ -265,7 +267,7 @@ func (p *RemoteWorkspaceInfoProvider) listen(client wsapi.WorkspaceManagerClient
265
267
}
266
268
267
269
if status .Phase == wsapi .WorkspacePhase_STOPPED {
268
- p .cache .Delete (status .Metadata .MetaId )
270
+ p .cache .Delete (status .Metadata .MetaId , status . Id )
269
271
} else {
270
272
info := mapWorkspaceStatusToInfo (status )
271
273
p .cache .Insert (info )
@@ -309,10 +311,12 @@ func mapWorkspaceStatusToInfo(status *wsapi.WorkspaceStatus) *WorkspaceInfo {
309
311
IDEPublicPort : getPortStr (status .Spec .Url ),
310
312
Ports : portInfos ,
311
313
Auth : status .Auth ,
314
+ Phase : status .Phase ,
315
+ StartedAt : status .Metadata .StartedAt .AsTime (),
312
316
}
313
317
}
314
318
315
- // WorkspaceInfo return the WorkspaceInfo avaiable for the given workspaceID.
319
+ // WorkspaceInfo return the WorkspaceInfo available for the given workspaceID.
316
320
// Callers should make sure their context gets canceled properly. For good measure
317
321
// this function will timeout by itself as well.
318
322
func (p * RemoteWorkspaceInfoProvider ) WorkspaceInfo (ctx context.Context , workspaceID string ) * WorkspaceInfo {
@@ -355,10 +359,14 @@ func getPortStr(urlStr string) string {
355
359
return portURL .Port ()
356
360
}
357
361
362
+ // TODO(rl): type def workspaceID and instanceID throughout for better readability and type safety?
363
+ type workspaceInfosByInstance map [string ]* WorkspaceInfo
364
+ type instanceInfosByWorkspace map [string ]workspaceInfosByInstance
365
+
358
366
// workspaceInfoCache stores WorkspaceInfo in a manner which is easy to query for WorkspaceInfoProvider
359
367
type workspaceInfoCache struct {
360
- // WorkspaceInfos indexed by workspaceID
361
- infos map [ string ] * WorkspaceInfo
368
+ // WorkspaceInfos indexed by workspaceID and then instanceID
369
+ infos instanceInfosByWorkspace
362
370
// WorkspaceCoords indexed by public (proxy) port (string)
363
371
coordsByPublicPort map [string ]* WorkspaceCoords
364
372
@@ -371,7 +379,7 @@ type workspaceInfoCache struct {
371
379
func newWorkspaceInfoCache () * workspaceInfoCache {
372
380
var mu sync.RWMutex
373
381
return & workspaceInfoCache {
374
- infos : make (map [string ]* WorkspaceInfo ),
382
+ infos : make (map [string ]workspaceInfosByInstance ),
375
383
coordsByPublicPort : make (map [string ]* WorkspaceCoords ),
376
384
mu : & mu ,
377
385
cond : sync .NewCond (& mu ),
@@ -382,7 +390,7 @@ func (c *workspaceInfoCache) Reinit(infos []*WorkspaceInfo) {
382
390
c .cond .L .Lock ()
383
391
defer c .cond .L .Unlock ()
384
392
385
- c .infos = make (map [string ]* WorkspaceInfo , len (infos ))
393
+ c .infos = make (map [string ]workspaceInfosByInstance , len (infos ))
386
394
c .coordsByPublicPort = make (map [string ]* WorkspaceCoords , len (c .coordsByPublicPort ))
387
395
388
396
for _ , info := range infos {
@@ -400,7 +408,15 @@ func (c *workspaceInfoCache) Insert(info *WorkspaceInfo) {
400
408
}
401
409
402
410
func (c * workspaceInfoCache ) doInsert (info * WorkspaceInfo ) {
403
- c .infos [info .WorkspaceID ] = info
411
+ existingInfos , ok := c .infos [info .WorkspaceID ]
412
+ if ! ok {
413
+ existingInfos = make (map [string ]* WorkspaceInfo )
414
+ }
415
+ existingInfos [info .InstanceID ] = info
416
+ c .infos [info .WorkspaceID ] = existingInfos
417
+
418
+ // NOTE: in the unlikely event that a subsequent insert changes the port
419
+ // then we add it here assuming that the delete will remove it
404
420
c .coordsByPublicPort [info .IDEPublicPort ] = & WorkspaceCoords {
405
421
ID : info .WorkspaceID ,
406
422
}
@@ -413,25 +429,66 @@ func (c *workspaceInfoCache) doInsert(info *WorkspaceInfo) {
413
429
}
414
430
}
415
431
416
- func (c * workspaceInfoCache ) Delete (workspaceID string ) {
432
+ func (c * workspaceInfoCache ) Delete (workspaceID string , instanceID string ) {
417
433
c .cond .L .Lock ()
418
434
defer c .cond .L .Unlock ()
419
435
420
- info , present := c .infos [workspaceID ]
421
- if ! present || info == nil {
436
+ infos , present := c .infos [workspaceID ]
437
+ if ! present || infos == nil {
422
438
return
423
439
}
424
- delete (c .coordsByPublicPort , info .IDEPublicPort )
425
- delete (c .infos , workspaceID )
440
+
441
+ // Keep only the active instances and public port(s) for the workspace id
442
+ var instanceIDEPublicPort string
443
+ if info , ok := infos [instanceID ]; ok {
444
+ delete (infos , instanceID )
445
+ instanceIDEPublicPort = info .IDEPublicPort
446
+ }
447
+
448
+ if len (infos ) == 0 {
449
+ delete (c .infos , workspaceID )
450
+ } else {
451
+ c .infos [workspaceID ] = infos
452
+ }
453
+
454
+ // Clean up the public port if port no longer active
455
+ activePublicPorts := make (map [string ]interface {})
456
+ for _ , info := range infos {
457
+ activePublicPorts [info .IDEPublicPort ] = true
458
+ }
459
+
460
+ if _ , ok := activePublicPorts [instanceIDEPublicPort ]; ! ok {
461
+ log .WithField ("workspaceID" , workspaceID ).WithField ("instanceID" , instanceID ).WithField ("port" , instanceIDEPublicPort ).Debug ("deleting port for workspace" )
462
+ delete (c .coordsByPublicPort , instanceIDEPublicPort )
463
+ }
426
464
}
427
465
428
466
// WaitFor waits for workspace info until that info is available or the context is canceled.
429
467
func (c * workspaceInfoCache ) WaitFor (ctx context.Context , workspaceID string ) (w * WorkspaceInfo , ok bool ) {
430
468
c .mu .RLock ()
431
- w , ok = c .infos [workspaceID ]
469
+ existing , ok : = c .infos [workspaceID ]
432
470
c .mu .RUnlock ()
471
+
472
+ getCandidate := func (infos map [string ]* WorkspaceInfo ) * WorkspaceInfo {
473
+ // Find the *best* candidate i.e. prefer any instance running over stopping/stopped
474
+ // If there are >1 instances in running state it will pick the most recently started
475
+ // as opposed to the random iteration order (which is harder to test)
476
+ // NOTE: Yes, these is a potential issue with clock drift
477
+ // but this scenario is extremely unlikely to happen in the wild
478
+ var candidate * WorkspaceInfo
479
+ for _ , info := range infos {
480
+ if candidate == nil ||
481
+ (info .Phase == wsapi .WorkspacePhase_RUNNING &&
482
+ (candidate .Phase != info .Phase || info .StartedAt .After (candidate .StartedAt ))) ||
483
+ (candidate .Phase > wsapi .WorkspacePhase_RUNNING && candidate .Phase < info .Phase ) {
484
+ candidate = info
485
+ }
486
+ }
487
+ return candidate
488
+ }
489
+
433
490
if ok {
434
- return
491
+ return getCandidate ( existing ), true
435
492
}
436
493
437
494
inc := make (chan * WorkspaceInfo )
@@ -446,12 +503,12 @@ func (c *workspaceInfoCache) WaitFor(ctx context.Context, workspaceID string) (w
446
503
return
447
504
}
448
505
449
- info , ok := c .infos [workspaceID ]
506
+ infos , ok := c .infos [workspaceID ]
450
507
if ! ok {
451
508
continue
452
509
}
453
510
454
- inc <- info
511
+ inc <- getCandidate ( infos )
455
512
return
456
513
}
457
514
}()
0 commit comments