@@ -12,6 +12,7 @@ import (
12
12
"strconv"
13
13
"strings"
14
14
"sync"
15
+ "sync/atomic"
15
16
"time"
16
17
)
17
18
@@ -40,6 +41,11 @@ type TreeEntry struct {
40
41
sizeOnce sync.Once
41
42
}
42
43
44
+ // Mode returns the entry mode if the tree entry.
45
+ func (e * TreeEntry ) Mode () EntryMode {
46
+ return e .mode
47
+ }
48
+
43
49
// IsTree returns tree if the entry itself is another tree (i.e. a directory).
44
50
func (e * TreeEntry ) IsTree () bool {
45
51
return e .mode == EntryTree
@@ -104,9 +110,10 @@ func (e *TreeEntry) Blob() *Blob {
104
110
}
105
111
}
106
112
113
+ // Entries is a sortable list of tree entries.
107
114
type Entries []* TreeEntry
108
115
109
- var sorter = []func (t1 , t2 * TreeEntry ) bool {
116
+ var sorters = []func (t1 , t2 * TreeEntry ) bool {
110
117
func (t1 , t2 * TreeEntry ) bool {
111
118
return (t1 .IsTree () || t1 .IsCommit ()) && ! t2 .IsTree () && ! t2 .IsCommit ()
112
119
},
@@ -120,138 +127,145 @@ func (es Entries) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
120
127
func (es Entries ) Less (i , j int ) bool {
121
128
t1 , t2 := es [i ], es [j ]
122
129
var k int
123
- for k = 0 ; k < len (sorter )- 1 ; k ++ {
124
- sort := sorter [k ]
130
+ for k = 0 ; k < len (sorters )- 1 ; k ++ {
131
+ sorter := sorters [k ]
125
132
switch {
126
- case sort (t1 , t2 ):
133
+ case sorter (t1 , t2 ):
127
134
return true
128
- case sort (t2 , t1 ):
135
+ case sorter (t2 , t1 ):
129
136
return false
130
137
}
131
138
}
132
- return sorter [k ](t1 , t2 )
139
+ return sorters [k ](t1 , t2 )
133
140
}
134
141
135
142
func (es Entries ) Sort () {
136
143
sort .Sort (es )
137
144
}
138
145
139
- var defaultConcurrency = runtime .NumCPU ()
140
-
141
- type commitInfo struct {
142
- entryName string
143
- infos []interface {}
144
- err error
146
+ // EntryCommitInfo contains a tree entry with its commit information.
147
+ type EntryCommitInfo struct {
148
+ entry * TreeEntry
149
+ commit * Commit
150
+ submodule * Submodule
145
151
}
146
152
147
- // CommitsInfo takes advantages of concurrency to speed up getting information
148
- // of all commits that are corresponding to these entries. This method will automatically
149
- // choose the right number of goroutine (concurrency) to use related of the host CPU.
150
- func (es Entries ) CommitsInfo (timeout time.Duration , commit * Commit , treePath string ) ([][]interface {}, error ) {
151
- return es .CommitsInfoWithCustomConcurrency (timeout , commit , treePath , 0 )
153
+ // CommitsInfoOptions contains optional arguments for getting commits information.
154
+ type CommitsInfoOptions struct {
155
+ // The relative path of the repository.
156
+ Path string
157
+ // The maximum number of goroutines to be used for getting commits information.
158
+ // When not set (i.e. <=0), runtime.GOMAXPROCS is used to determine the value.
159
+ MaxConcurrency int
160
+ // The timeout duration before giving up for each shell command execution.
161
+ // The default timeout duration will be used when not supplied.
162
+ Timeout time.Duration
152
163
}
153
164
154
- // CommitsInfoWithCustomConcurrency takes advantages of concurrency to speed up getting information
155
- // of all commits that are corresponding to these entries. If the given maxConcurrency is negative or
156
- // equal to zero: the right number of goroutine (concurrency) to use will be choosen related of the
157
- // host CPU.
158
- func (es Entries ) CommitsInfoWithCustomConcurrency (timeout time.Duration , commit * Commit , treePath string , maxConcurrency int ) ([][]interface {}, error ) {
165
+ var defaultConcurrency = runtime .GOMAXPROCS (0 )
166
+
167
+ // CommitsInfo returns a list of commit information for these tree entries in the state of
168
+ // given commit and subpath. It takes advantages of concurrency to speed up the process.
169
+ // The returned list has the same number of items as tree entries, so the caller can access
170
+ // them via slice indices.
171
+ func (es Entries ) CommitsInfo (commit * Commit , opts ... CommitsInfoOptions ) ([]* EntryCommitInfo , error ) {
159
172
if len (es ) == 0 {
160
- return nil , nil
173
+ return [] * EntryCommitInfo {} , nil
161
174
}
162
175
163
- if maxConcurrency <= 0 {
164
- maxConcurrency = defaultConcurrency
176
+ var opt CommitsInfoOptions
177
+ if len (opts ) > 0 {
178
+ opt = opts [0 ]
165
179
}
166
180
167
- // Length of taskChan determines how many goroutines (subprocesses) can run at the same time.
168
- // The length of revChan should be same as taskChan so goroutines whoever finished job can
169
- // exit as early as possible, only store data inside channel.
170
- taskChan := make (chan bool , maxConcurrency )
171
- revChan := make (chan commitInfo , maxConcurrency )
172
- doneChan := make (chan error )
181
+ if opt .MaxConcurrency <= 0 {
182
+ opt .MaxConcurrency = defaultConcurrency
183
+ }
184
+
185
+ // Length of bucket determines how many goroutines (subprocesses) can run at the same time.
186
+ bucket := make (chan struct {}, opt .MaxConcurrency )
187
+ results := make (chan * EntryCommitInfo , len (es ))
188
+ errs := make (chan error , 1 )
189
+
190
+ var errored int64
191
+ hasErrored := func () bool {
192
+ return atomic .LoadInt64 (& errored ) != 0
193
+ }
194
+ // Only count for the first error, discard the rest
195
+ setError := func (err error ) {
196
+ if ! atomic .CompareAndSwapInt64 (& errored , 0 , 1 ) {
197
+ return
198
+ }
199
+ errs <- err
200
+ }
173
201
174
- // Receive loop will exit when it collects same number of data pieces as tree entries.
175
- // It notifies doneChan before exits or notify early with possible error.
176
- infoMap := make (map [string ][]interface {}, len (es ))
202
+ var wg sync.WaitGroup
203
+ wg .Add (len (es ))
177
204
go func () {
178
- i := 0
179
- for info := range revChan {
180
- if info . err != nil {
181
- doneChan <- info . err
205
+ for i , e := range es {
206
+ // Shrink down the counter and exit when there is an error
207
+ if hasErrored () {
208
+ wg . Add ( i - len ( es ))
182
209
return
183
210
}
184
211
185
- infoMap [info .entryName ] = info .infos
186
- i ++
187
- if i == len (es ) {
188
- break
189
- }
190
- }
191
- doneChan <- nil
192
- }()
212
+ // Block until there is an empty slot to control the maximum concurrency
213
+ bucket <- struct {}{}
193
214
194
- for i := range es {
195
- // When taskChan is idle (or has empty slots), put operation will not block.
196
- // However when taskChan is full, code will block and wait any running goroutines to finish.
197
- taskChan <- true
198
-
199
- if es [i ].typ != ObjectCommit {
200
- go func (i int ) {
201
- cinfo := commitInfo {entryName : es [i ].Name ()}
202
- c , err := commit .CommitByPath (CommitByRevisionOptions {
203
- Path : path .Join (treePath , es [i ].Name ()),
204
- Timeout : timeout ,
215
+ go func (e * TreeEntry ) {
216
+ defer func () {
217
+ wg .Done ()
218
+ <- bucket
219
+ }()
220
+
221
+ // Avoid expensive operations if has errored
222
+ if hasErrored () {
223
+ return
224
+ }
225
+
226
+ info := & EntryCommitInfo {
227
+ entry : e ,
228
+ }
229
+ epath := path .Join (opt .Path , e .Name ())
230
+
231
+ var err error
232
+ info .commit , err = commit .CommitByPath (CommitByRevisionOptions {
233
+ Path : epath ,
234
+ Timeout : opt .Timeout ,
205
235
})
206
236
if err != nil {
207
- cinfo .err = fmt .Errorf ("get commit by path (%s/%s): %v" , treePath , es [i ].Name (), err )
208
- } else {
209
- cinfo .infos = []interface {}{es [i ], c }
237
+ setError (fmt .Errorf ("get commit by path %q: %v" , epath , err ))
238
+ return
210
239
}
211
- revChan <- cinfo
212
- <- taskChan // Clear one slot from taskChan to allow new goroutines to start.
213
- }(i )
214
- continue
215
- }
216
-
217
- // Handle submodule
218
- go func (i int ) {
219
- cinfo := commitInfo {entryName : es [i ].Name ()}
220
- sm , err := commit .Submodule (path .Join (treePath , es [i ].Name ()))
221
- if err != nil && err != ErrSubmoduleNotExist {
222
- cinfo .err = fmt .Errorf ("get submodule (%s/%s): %v" , treePath , es [i ].Name (), err )
223
- revChan <- cinfo
224
- return
225
- }
226
240
227
- c , err := commit .CommitByPath (CommitByRevisionOptions {
228
- Path : path .Join (treePath , es [i ].Name ()),
229
- Timeout : timeout ,
230
- })
231
- if err != nil {
232
- cinfo .err = fmt .Errorf ("get commit by path (%s/%s): %v" , treePath , es [i ].Name (), err )
233
- } else {
234
- cinfo .infos = []interface {}{
235
- es [i ],
236
- & SubmoduleEntry {
237
- id : es [i ].id ,
238
- Submodule : sm ,
239
- Commit : c ,
240
- },
241
+ // Get extra information for submodules
242
+ if e .IsCommit () {
243
+ info .submodule , err = commit .Submodule (epath )
244
+ if err != nil {
245
+ setError (fmt .Errorf ("get submodule %q: %v" , epath , err ))
246
+ return
247
+ }
241
248
}
242
- }
243
- revChan <- cinfo
244
- <- taskChan
245
- }(i )
249
+
250
+ results <- info
251
+ }(e )
252
+ }
253
+ }()
254
+
255
+ wg .Wait ()
256
+ if hasErrored () {
257
+ return nil , <- errs
246
258
}
247
259
248
- if err := <- doneChan ; err != nil {
249
- return nil , err
260
+ close (results )
261
+ infos := make (map [[20 ]byte ]* EntryCommitInfo , len (es ))
262
+ for info := range results {
263
+ infos [info .entry .id .bytes ] = info
250
264
}
251
265
252
- commitsInfo := make ([][] interface {} , len (es ))
253
- for i := 0 ; i < len ( es ); i ++ {
254
- commitsInfo [i ] = infoMap [ es [ i ]. Name () ]
266
+ commitsInfo := make ([]* EntryCommitInfo , len (es ))
267
+ for i , e := range es {
268
+ commitsInfo [i ] = infos [ e . id . bytes ]
255
269
}
256
270
return commitsInfo , nil
257
271
}
0 commit comments