1
1
package util
2
2
3
3
import (
4
+ "context"
4
5
"sync"
5
6
"unsafe"
6
7
@@ -37,17 +38,18 @@ func StringsClone(s string) string {
37
38
38
39
// MergeSlicesParallel merge sorted slices in parallel
39
40
// using the MergeSortedSlices function
40
- func MergeSlicesParallel (parallelism int , a ... []string ) []string {
41
+ func MergeSlicesParallel (ctx context. Context , parallelism int , a ... []string ) ( []string , error ) {
41
42
if parallelism <= 1 {
42
- return MergeSortedSlices (a ... )
43
+ return MergeSortedSlices (ctx , a ... )
43
44
}
44
45
if len (a ) == 0 {
45
- return nil
46
+ return nil , nil
46
47
}
47
48
if len (a ) == 1 {
48
- return a [0 ]
49
+ return a [0 ], nil
49
50
}
50
51
c := make (chan []string , len (a ))
52
+ errCh := make (chan error , 1 )
51
53
wg := sync.WaitGroup {}
52
54
var r [][]string
53
55
p := min (parallelism , len (a )/ 2 )
@@ -57,21 +59,31 @@ func MergeSlicesParallel(parallelism int, a ...[]string) []string {
57
59
wg .Add (1 )
58
60
go func (i int ) {
59
61
m := min (len (a ), i + batchSize )
60
- c <- MergeSortedSlices (a [i :m ]... )
62
+ r , e := MergeSortedSlices (ctx , a [i :m ]... )
63
+ if e != nil {
64
+ errCh <- e
65
+ wg .Done ()
66
+ return
67
+ }
68
+ c <- r
61
69
wg .Done ()
62
70
}(i )
63
71
}
64
72
65
73
go func () {
66
74
wg .Wait ()
67
75
close (c )
76
+ close (errCh )
68
77
}()
69
78
79
+ if err := <- errCh ; err != nil {
80
+ return nil , err
81
+ }
70
82
for s := range c {
71
83
r = append (r , s )
72
84
}
73
85
74
- return MergeSortedSlices (r ... )
86
+ return MergeSortedSlices (ctx , r ... )
75
87
}
76
88
77
89
func NewStringListIter (s []string ) * StringListIter {
@@ -98,9 +110,9 @@ var MAX_STRING = string([]byte{0xff})
98
110
99
111
// MergeSortedSlices merges a set of sorted string slices into a single ones
100
112
// while removing all duplicates.
101
- func MergeSortedSlices (a ... []string ) []string {
113
+ func MergeSortedSlices (ctx context. Context , a ... []string ) ( []string , error ) {
102
114
if len (a ) == 1 {
103
- return a [0 ]
115
+ return a [0 ], nil
104
116
}
105
117
its := make ([]* StringListIter , 0 , len (a ))
106
118
sumLengh := 0
@@ -111,16 +123,19 @@ func MergeSortedSlices(a ...[]string) []string {
111
123
lt := loser .New (its , MAX_STRING )
112
124
113
125
if sumLengh == 0 {
114
- return []string {}
126
+ return []string {}, nil
115
127
}
116
128
117
129
r := make ([]string , 0 , sumLengh * 2 / 10 )
118
130
var current string
119
131
for lt .Next () {
132
+ if ctx .Err () != nil {
133
+ return nil , ctx .Err ()
134
+ }
120
135
if lt .At () != current {
121
136
current = lt .At ()
122
137
r = append (r , current )
123
138
}
124
139
}
125
- return r
140
+ return r , nil
126
141
}
0 commit comments