Skip to content

Commit 57bb7be

Browse files
committed
sync: internal dynamically sized lock-free queue for sync.Pool
This adds a dynamically sized, lock-free, single-producer, multi-consumer queue that will be used in the new Pool stealing implementation. It's built on top of the fixed-size queue added in the previous CL. For #22950, #22331. Change-Id: Ifc0ca3895bec7e7f9289ba9fb7dd0332bf96ba5a Reviewed-on: https://go-review.googlesource.com/c/go/+/166958 Run-TryBot: Austin Clements <[email protected]> TryBot-Result: Gobot Gobot <[email protected]> Reviewed-by: David Chase <[email protected]>
1 parent 2b60567 commit 57bb7be

File tree

3 files changed

+153
-5
lines changed

3 files changed

+153
-5
lines changed

src/sync/export_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,20 @@ func (d *poolDequeue) PopHead() (interface{}, bool) {
3434
func (d *poolDequeue) PopTail() (interface{}, bool) {
3535
return d.popTail()
3636
}
37+
38+
func NewPoolChain() PoolDequeue {
39+
return new(poolChain)
40+
}
41+
42+
func (c *poolChain) PushHead(val interface{}) bool {
43+
c.pushHead(val)
44+
return true
45+
}
46+
47+
func (c *poolChain) PopHead() (interface{}, bool) {
48+
return c.popHead()
49+
}
50+
51+
func (c *poolChain) PopTail() (interface{}, bool) {
52+
return c.popTail()
53+
}

src/sync/pool_test.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -151,14 +151,21 @@ func TestPoolStress(t *testing.T) {
151151
}
152152

153153
func TestPoolDequeue(t *testing.T) {
154+
testPoolDequeue(t, NewPoolDequeue(16))
155+
}
156+
157+
func TestPoolChain(t *testing.T) {
158+
testPoolDequeue(t, NewPoolChain())
159+
}
160+
161+
func testPoolDequeue(t *testing.T, d PoolDequeue) {
154162
const P = 10
155163
// In long mode, do enough pushes to wrap around the 21-bit
156164
// indexes.
157165
N := 1<<21 + 1000
158166
if testing.Short() {
159167
N = 1e3
160168
}
161-
d := NewPoolDequeue(16)
162169
have := make([]int32, N)
163170
var stop int32
164171
var wg WaitGroup

src/sync/poolqueue.go

+128-4
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ const dequeueBits = 32
5252

5353
// dequeueLimit is the maximum size of a poolDequeue.
5454
//
55-
// This is half of 1<<dequeueBits because detecting fullness depends
56-
// on wrapping around the ring buffer without wrapping around the
57-
// index.
58-
const dequeueLimit = (1 << dequeueBits) / 2
55+
// This must be at most (1<<dequeueBits)/2 because detecting fullness
56+
// depends on wrapping around the ring buffer without wrapping around
57+
// the index. We divide by 4 so this fits in an int on 32-bit.
58+
const dequeueLimit = (1 << dequeueBits) / 4
5959

6060
// dequeueNil is used in poolDeqeue to represent interface{}(nil).
6161
// Since we use nil to represent empty slots, we need a sentinel value
@@ -183,3 +183,127 @@ func (d *poolDequeue) popTail() (interface{}, bool) {
183183

184184
return val, true
185185
}
186+
187+
// poolChain is a dynamically-sized version of poolDequeue.
188+
//
189+
// This is implemented as a doubly-linked list queue of poolDequeues
190+
// where each dequeue is double the size of the previous one. Once a
191+
// dequeue fills up, this allocates a new one and only ever pushes to
192+
// the latest dequeue. Pops happen from the other end of the list and
193+
// once a dequeue is exhausted, it gets removed from the list.
194+
type poolChain struct {
195+
// head is the poolDequeue to push to. This is only accessed
196+
// by the producer, so doesn't need to be synchronized.
197+
head *poolChainElt
198+
199+
// tail is the poolDequeue to popTail from. This is accessed
200+
// by consumers, so reads and writes must be atomic.
201+
tail *poolChainElt
202+
}
203+
204+
type poolChainElt struct {
205+
poolDequeue
206+
207+
// next and prev link to the adjacent poolChainElts in this
208+
// poolChain.
209+
//
210+
// next is written atomically by the producer and read
211+
// atomically by the consumer. It only transitions from nil to
212+
// non-nil.
213+
//
214+
// prev is written atomically by the consumer and read
215+
// atomically by the producer. It only transitions from
216+
// non-nil to nil.
217+
next, prev *poolChainElt
218+
}
219+
220+
func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
221+
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
222+
}
223+
224+
func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
225+
return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
226+
}
227+
228+
func (c *poolChain) pushHead(val interface{}) {
229+
d := c.head
230+
if d == nil {
231+
// Initialize the chain.
232+
const initSize = 8 // Must be a power of 2
233+
d = new(poolChainElt)
234+
d.vals = make([]eface, initSize)
235+
c.head = d
236+
storePoolChainElt(&c.tail, d)
237+
}
238+
239+
if d.pushHead(val) {
240+
return
241+
}
242+
243+
// The current dequeue is full. Allocate a new one of twice
244+
// the size.
245+
newSize := len(d.vals) * 2
246+
if newSize >= dequeueLimit {
247+
// Can't make it any bigger.
248+
newSize = dequeueLimit
249+
}
250+
251+
d2 := &poolChainElt{prev: d}
252+
d2.vals = make([]eface, newSize)
253+
c.head = d2
254+
storePoolChainElt(&d.next, d2)
255+
d2.pushHead(val)
256+
}
257+
258+
func (c *poolChain) popHead() (interface{}, bool) {
259+
d := c.head
260+
for d != nil {
261+
if val, ok := d.popHead(); ok {
262+
return val, ok
263+
}
264+
// There may still be unconsumed elements in the
265+
// previous dequeue, so try backing up.
266+
d = loadPoolChainElt(&d.prev)
267+
}
268+
return nil, false
269+
}
270+
271+
func (c *poolChain) popTail() (interface{}, bool) {
272+
d := loadPoolChainElt(&c.tail)
273+
if d == nil {
274+
return nil, false
275+
}
276+
277+
for {
278+
// It's important that we load the next pointer
279+
// *before* popping the tail. In general, d may be
280+
// transiently empty, but if next is non-nil before
281+
// the pop and the pop fails, then d is permanently
282+
// empty, which is the only condition under which it's
283+
// safe to drop d from the chain.
284+
d2 := loadPoolChainElt(&d.next)
285+
286+
if val, ok := d.popTail(); ok {
287+
return val, ok
288+
}
289+
290+
if d2 == nil {
291+
// This is the only dequeue. It's empty right
292+
// now, but could be pushed to in the future.
293+
return nil, false
294+
}
295+
296+
// The tail of the chain has been drained, so move on
297+
// to the next dequeue. Try to drop it from the chain
298+
// so the next pop doesn't have to look at the empty
299+
// dequeue again.
300+
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
301+
// We won the race. Clear the prev pointer so
302+
// the garbage collector can collect the empty
303+
// dequeue and so popHead doesn't back up
304+
// further than necessary.
305+
storePoolChainElt(&d2.prev, nil)
306+
}
307+
d = d2
308+
}
309+
}

0 commit comments

Comments
 (0)