Skip to content
This repository was archived by the owner on Sep 9, 2020. It is now read-only.

Commit a9b7c18

Browse files
authored
Merge pull request #546 from jmank88/source_gateway_simp
sourceCoordinator source gateway concurrency simplification
2 parents 5540a42 + 12e6393 commit a9b7c18

File tree

1 file changed

+11
-19
lines changed

1 file changed

+11
-19
lines changed

internal/gps/source.go

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -82,29 +82,20 @@ func (sc *sourceCoordinator) getSourceGatewayFor(ctx context.Context, id Project
8282

8383
// No gateway exists for this path yet; set up a proto, being careful to fold
8484
// together simultaneous attempts on the same path.
85-
rc := srcReturnChans{
86-
ret: make(chan *sourceGateway),
87-
err: make(chan error),
88-
}
89-
90-
// The rest of the work needs its own goroutine, the results of which will
91-
// be re-joined to this call via the return chans.
92-
go sc.setUpSourceGateway(ctx, normalizedName, rc)
93-
return rc.awaitReturn()
94-
}
95-
96-
// Not intended to be called externally - call getSourceGatewayFor instead.
97-
func (sc *sourceCoordinator) setUpSourceGateway(ctx context.Context, normalizedName string, rc srcReturnChans) {
9885
sc.psrcmut.Lock()
9986
if chans, has := sc.protoSrcs[normalizedName]; has {
10087
// Another goroutine is already working on this normalizedName. Fold
10188
// in with that work by attaching our return channels to the list.
89+
rc := srcReturnChans{
90+
ret: make(chan *sourceGateway, 1),
91+
err: make(chan error, 1),
92+
}
10293
sc.protoSrcs[normalizedName] = append(chans, rc)
10394
sc.psrcmut.Unlock()
104-
return
95+
return rc.awaitReturn()
10596
}
10697

107-
sc.protoSrcs[normalizedName] = []srcReturnChans{rc}
98+
sc.protoSrcs[normalizedName] = []srcReturnChans{}
10899
sc.psrcmut.Unlock()
109100

110101
doReturn := func(sg *sourceGateway, err error) {
@@ -130,7 +121,7 @@ func (sc *sourceCoordinator) setUpSourceGateway(ctx context.Context, normalizedN
130121
// As in the deducer, don't cache errors so that externally-driven retry
131122
// strategies can be constructed.
132123
doReturn(nil, err)
133-
return
124+
return nil, err
134125
}
135126

136127
// It'd be quite the feat - but not impossible - for a gateway
@@ -144,7 +135,7 @@ func (sc *sourceCoordinator) setUpSourceGateway(ctx context.Context, normalizedN
144135
if srcGate, has := sc.srcs[url]; has {
145136
sc.srcmut.RUnlock()
146137
doReturn(srcGate, nil)
147-
return
138+
return srcGate, nil
148139
}
149140
panic(fmt.Sprintf("%q was URL for %q in nameToURL, but no corresponding srcGate in srcs map", url, normalizedName))
150141
}
@@ -165,7 +156,7 @@ func (sc *sourceCoordinator) setUpSourceGateway(ctx context.Context, normalizedN
165156
url, err := srcGate.sourceURL(ctx)
166157
if err != nil {
167158
doReturn(nil, err)
168-
return
159+
return nil, err
169160
}
170161

171162
// We know we have a working srcGateway at this point, and need to
@@ -178,11 +169,12 @@ func (sc *sourceCoordinator) setUpSourceGateway(ctx context.Context, normalizedN
178169
if sa, has := sc.srcs[url]; has {
179170
// URL already had an entry in the main map; use that as the result.
180171
doReturn(sa, nil)
181-
return
172+
return sa, nil
182173
}
183174

184175
sc.srcs[url] = srcGate
185176
doReturn(srcGate, nil)
177+
return srcGate, nil
186178
}
187179

188180
// sourceGateways manage all incoming calls for data from sources, serializing

0 commit comments

Comments
 (0)