Skip to content

Commit 2b60567

Browse files
committed
sync: internal fixed size lock-free queue for sync.Pool
This is the first step toward fixing multiple issues with sync.Pool. This adds a fixed size, lock-free, single-producer, multi-consumer queue that will be used in the new Pool stealing implementation. For #22950, #22331. Change-Id: I50e85e3cb83a2ee71f611ada88e7f55996504bb5 Reviewed-on: https://go-review.googlesource.com/c/go/+/166957 Run-TryBot: Austin Clements <[email protected]> TryBot-Result: Gobot Gobot <[email protected]> Reviewed-by: David Chase <[email protected]>
1 parent e47ced7 commit 2b60567

File tree

3 files changed

+283
-0
lines changed

3 files changed

+283
-0
lines changed

src/sync/export_test.go

+25
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,28 @@ var Runtime_Semacquire = runtime_Semacquire
99
var Runtime_Semrelease = runtime_Semrelease
1010
var Runtime_procPin = runtime_procPin
1111
var Runtime_procUnpin = runtime_procUnpin
12+
13+
// poolDequeue testing.
14+
type PoolDequeue interface {
15+
PushHead(val interface{}) bool
16+
PopHead() (interface{}, bool)
17+
PopTail() (interface{}, bool)
18+
}
19+
20+
func NewPoolDequeue(n int) PoolDequeue {
21+
return &poolDequeue{
22+
vals: make([]eface, n),
23+
}
24+
}
25+
26+
func (d *poolDequeue) PushHead(val interface{}) bool {
27+
return d.pushHead(val)
28+
}
29+
30+
func (d *poolDequeue) PopHead() (interface{}, bool) {
31+
return d.popHead()
32+
}
33+
34+
func (d *poolDequeue) PopTail() (interface{}, bool) {
35+
return d.popTail()
36+
}

src/sync/pool_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,79 @@ func TestPoolStress(t *testing.T) {
150150
}
151151
}
152152

153+
func TestPoolDequeue(t *testing.T) {
154+
const P = 10
155+
// In long mode, do enough pushes to wrap around the 21-bit
156+
// indexes.
157+
N := 1<<21 + 1000
158+
if testing.Short() {
159+
N = 1e3
160+
}
161+
d := NewPoolDequeue(16)
162+
have := make([]int32, N)
163+
var stop int32
164+
var wg WaitGroup
165+
166+
// Start P-1 consumers.
167+
for i := 1; i < P; i++ {
168+
wg.Add(1)
169+
go func() {
170+
fail := 0
171+
for atomic.LoadInt32(&stop) == 0 {
172+
val, ok := d.PopTail()
173+
if ok {
174+
fail = 0
175+
atomic.AddInt32(&have[val.(int)], 1)
176+
if val.(int) == N-1 {
177+
atomic.StoreInt32(&stop, 1)
178+
}
179+
} else {
180+
// Speed up the test by
181+
// allowing the pusher to run.
182+
if fail++; fail%100 == 0 {
183+
runtime.Gosched()
184+
}
185+
}
186+
}
187+
wg.Done()
188+
}()
189+
}
190+
191+
// Start 1 producer.
192+
nPopHead := 0
193+
wg.Add(1)
194+
go func() {
195+
for j := 0; j < N; j++ {
196+
for !d.PushHead(j) {
197+
// Allow a popper to run.
198+
runtime.Gosched()
199+
}
200+
if j%10 == 0 {
201+
val, ok := d.PopHead()
202+
if ok {
203+
nPopHead++
204+
atomic.AddInt32(&have[val.(int)], 1)
205+
}
206+
}
207+
}
208+
wg.Done()
209+
}()
210+
wg.Wait()
211+
212+
// Check results.
213+
for i, count := range have {
214+
if count != 1 {
215+
t.Errorf("expected have[%d] = 1, got %d", i, count)
216+
}
217+
}
218+
if nPopHead == 0 {
219+
// In theory it's possible in a valid schedule for
220+
// popHead to never succeed, but in practice it almost
221+
// always succeeds, so this is unlikely to flake.
222+
t.Errorf("popHead never succeeded")
223+
}
224+
}
225+
153226
func BenchmarkPool(b *testing.B) {
154227
var p Pool
155228
b.RunParallel(func(pb *testing.PB) {

src/sync/poolqueue.go

+185
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Copyright 2019 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package sync
6+
7+
import (
8+
"sync/atomic"
9+
"unsafe"
10+
)
11+
12+
// poolDequeue is a lock-free fixed-size single-producer,
13+
// multi-consumer queue. The single producer can both push and pop
14+
// from the head, and consumers can pop from the tail.
15+
//
16+
// It has the added feature that it nils out unused slots to avoid
17+
// unnecessary retention of objects. This is important for sync.Pool,
18+
// but not typically a property considered in the literature.
19+
type poolDequeue struct {
20+
// headTail packs together a 32-bit head index and a 32-bit
21+
// tail index. Both are indexes into vals modulo len(vals)-1.
22+
//
23+
// tail = index of oldest data in queue
24+
// head = index of next slot to fill
25+
//
26+
// Slots in the range [tail, head) are owned by consumers.
27+
// A consumer continues to own a slot outside this range until
28+
// it nils the slot, at which point ownership passes to the
29+
// producer.
30+
//
31+
// The head index is stored in the most-significant bits so
32+
// that we can atomically add to it and the overflow is
33+
// harmless.
34+
headTail uint64
35+
36+
// vals is a ring buffer of interface{} values stored in this
37+
// dequeue. The size of this must be a power of 2.
38+
//
39+
// vals[i].typ is nil if the slot is empty and non-nil
40+
// otherwise. A slot is still in use until *both* the tail
41+
// index has moved beyond it and typ has been set to nil. This
42+
// is set to nil atomically by the consumer and read
43+
// atomically by the producer.
44+
vals []eface
45+
}
46+
47+
type eface struct {
48+
typ, val unsafe.Pointer
49+
}
50+
51+
const dequeueBits = 32
52+
53+
// dequeueLimit is the maximum size of a poolDequeue.
54+
//
55+
// This is half of 1<<dequeueBits because detecting fullness depends
56+
// on wrapping around the ring buffer without wrapping around the
57+
// index.
58+
const dequeueLimit = (1 << dequeueBits) / 2
59+
60+
// dequeueNil is used in poolDeqeue to represent interface{}(nil).
61+
// Since we use nil to represent empty slots, we need a sentinel value
62+
// to represent nil.
63+
type dequeueNil *struct{}
64+
65+
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
66+
const mask = 1<<dequeueBits - 1
67+
head = uint32((ptrs >> dequeueBits) & mask)
68+
tail = uint32(ptrs & mask)
69+
return
70+
}
71+
72+
func (d *poolDequeue) pack(head, tail uint32) uint64 {
73+
const mask = 1<<dequeueBits - 1
74+
return (uint64(head) << dequeueBits) |
75+
uint64(tail&mask)
76+
}
77+
78+
// pushHead adds val at the head of the queue. It returns false if the
79+
// queue is full. It must only be called by a single producer.
80+
func (d *poolDequeue) pushHead(val interface{}) bool {
81+
ptrs := atomic.LoadUint64(&d.headTail)
82+
head, tail := d.unpack(ptrs)
83+
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
84+
// Queue is full.
85+
return false
86+
}
87+
slot := &d.vals[head&uint32(len(d.vals)-1)]
88+
89+
// Check if the head slot has been released by popTail.
90+
typ := atomic.LoadPointer(&slot.typ)
91+
if typ != nil {
92+
// Another goroutine is still cleaning up the tail, so
93+
// the queue is actually still full.
94+
return false
95+
}
96+
97+
// The head slot is free, so we own it.
98+
if val == nil {
99+
val = dequeueNil(nil)
100+
}
101+
*(*interface{})(unsafe.Pointer(slot)) = val
102+
103+
// Increment head. This passes ownership of slot to popTail
104+
// and acts as a store barrier for writing the slot.
105+
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
106+
return true
107+
}
108+
109+
// popHead removes and returns the element at the head of the queue.
110+
// It returns false if the queue is empty. It must only be called by a
111+
// single producer.
112+
func (d *poolDequeue) popHead() (interface{}, bool) {
113+
var slot *eface
114+
for {
115+
ptrs := atomic.LoadUint64(&d.headTail)
116+
head, tail := d.unpack(ptrs)
117+
if tail == head {
118+
// Queue is empty.
119+
return nil, false
120+
}
121+
122+
// Confirm tail and decrement head. We do this before
123+
// reading the value to take back ownership of this
124+
// slot.
125+
head--
126+
ptrs2 := d.pack(head, tail)
127+
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
128+
// We successfully took back slot.
129+
slot = &d.vals[head&uint32(len(d.vals)-1)]
130+
break
131+
}
132+
}
133+
134+
val := *(*interface{})(unsafe.Pointer(slot))
135+
if val == dequeueNil(nil) {
136+
val = nil
137+
}
138+
// Zero the slot. Unlike popTail, this isn't racing with
139+
// pushHead, so we don't need to be careful here.
140+
*slot = eface{}
141+
return val, true
142+
}
143+
144+
// popTail removes and returns the element at the tail of the queue.
145+
// It returns false if the queue is empty. It may be called by any
146+
// number of consumers.
147+
func (d *poolDequeue) popTail() (interface{}, bool) {
148+
var slot *eface
149+
for {
150+
ptrs := atomic.LoadUint64(&d.headTail)
151+
head, tail := d.unpack(ptrs)
152+
if tail == head {
153+
// Queue is empty.
154+
return nil, false
155+
}
156+
157+
// Confirm head and tail (for our speculative check
158+
// above) and increment tail. If this succeeds, then
159+
// we own the slot at tail.
160+
ptrs2 := d.pack(head, tail+1)
161+
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
162+
// Success.
163+
slot = &d.vals[tail&uint32(len(d.vals)-1)]
164+
break
165+
}
166+
}
167+
168+
// We now own slot.
169+
val := *(*interface{})(unsafe.Pointer(slot))
170+
if val == dequeueNil(nil) {
171+
val = nil
172+
}
173+
174+
// Tell pushHead that we're done with this slot. Zeroing the
175+
// slot is also important so we don't leave behind references
176+
// that could keep this object live longer than necessary.
177+
//
178+
// We write to val first and then publish that we're done with
179+
// this slot by atomically writing to typ.
180+
slot.val = nil
181+
atomic.StorePointer(&slot.typ, nil)
182+
// At this point pushHead owns the slot.
183+
184+
return val, true
185+
}

0 commit comments

Comments
 (0)