Skip to content

Commit 5a06fca

Browse files
jbaianlancetaylor
authored andcommitted
semaphore: add a weighted semaphore implementation
This package provides a weighted semaphore that is context-aware. The code is derived from a similar package inside Google. Change-Id: Id1dad96d79e8ccfd289e4299e8265aa5bdad3a5b Reviewed-on: https://go-review.googlesource.com/38298 Reviewed-by: Ian Lance Taylor <[email protected]> Reviewed-by: Damien Neil <[email protected]> Run-TryBot: Ian Lance Taylor <[email protected]> TryBot-Result: Gobot Gobot <[email protected]>
1 parent a60ad46 commit 5a06fca

File tree

3 files changed

+429
-0
lines changed

3 files changed

+429
-0
lines changed

semaphore/semaphore.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright 2017 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// Package semaphore provides a weighted semaphore implementation.
6+
package semaphore // import "golang.org/x/sync/semaphore"
7+
8+
import (
9+
"container/list"
10+
"sync"
11+
12+
// Use the old context because packages that depend on this one
13+
// (e.g. cloud.google.com/go/...) must run on Go 1.6.
14+
// TODO(jba): update to "context" when possible.
15+
"golang.org/x/net/context"
16+
)
17+
18+
type waiter struct {
19+
n int64
20+
ready chan<- struct{} // Closed when semaphore acquired.
21+
}
22+
23+
// NewWeighted creates a new weighted semaphore with the given
24+
// maximum combined weight for concurrent access.
25+
func NewWeighted(n int64) *Weighted {
26+
w := &Weighted{size: n}
27+
return w
28+
}
29+
30+
// Weighted provides a way to bound concurrent access to a resource.
31+
// The callers can request access with a given weight.
32+
type Weighted struct {
33+
size int64
34+
cur int64
35+
mu sync.Mutex
36+
waiters list.List
37+
}
38+
39+
// Acquire acquires the semaphore with a weight of n, blocking only until ctx
40+
// is done. On success, returns nil. On failure, returns ctx.Err() and leaves
41+
// the semaphore unchanged.
42+
//
43+
// If ctx is already done, Acquire may still succeed without blocking.
44+
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
45+
s.mu.Lock()
46+
if s.size-s.cur >= n && s.waiters.Len() == 0 {
47+
s.cur += n
48+
s.mu.Unlock()
49+
return nil
50+
}
51+
52+
if n > s.size {
53+
// Don't make other Acquire calls block on one that's doomed to fail.
54+
s.mu.Unlock()
55+
<-ctx.Done()
56+
return ctx.Err()
57+
}
58+
59+
ready := make(chan struct{})
60+
w := waiter{n: n, ready: ready}
61+
elem := s.waiters.PushBack(w)
62+
s.mu.Unlock()
63+
64+
select {
65+
case <-ctx.Done():
66+
err := ctx.Err()
67+
s.mu.Lock()
68+
select {
69+
case <-ready:
70+
// Acquired the semaphore after we were canceled. Rather than trying to
71+
// fix up the queue, just pretend we didn't notice the cancelation.
72+
err = nil
73+
default:
74+
s.waiters.Remove(elem)
75+
}
76+
s.mu.Unlock()
77+
return err
78+
79+
case <-ready:
80+
return nil
81+
}
82+
}
83+
84+
// TryAcquire acquires the semaphore with a weight of n without blocking.
85+
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
86+
func (s *Weighted) TryAcquire(n int64) bool {
87+
s.mu.Lock()
88+
success := s.size-s.cur >= n && s.waiters.Len() == 0
89+
if success {
90+
s.cur += n
91+
}
92+
s.mu.Unlock()
93+
return success
94+
}
95+
96+
// Release releases the semaphore with a weight of n.
97+
func (s *Weighted) Release(n int64) {
98+
s.mu.Lock()
99+
s.cur -= n
100+
if s.cur < 0 {
101+
s.mu.Unlock()
102+
panic("semaphore: bad release")
103+
}
104+
for {
105+
next := s.waiters.Front()
106+
if next == nil {
107+
break // No more waiters blocked.
108+
}
109+
110+
w := next.Value.(waiter)
111+
if s.size-s.cur < w.n {
112+
// Not enough tokens for the next waiter. We could keep going (to try to
113+
// find a waiter with a smaller request), but under load that could cause
114+
// starvation for large requests; instead, we leave all remaining waiters
115+
// blocked.
116+
//
117+
// Consider a semaphore used as a read-write lock, with N tokens, N
118+
// readers, and one writer. Each reader can Acquire(1) to obtain a read
119+
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
120+
// of the readers. If we allow the readers to jump ahead in the queue,
121+
// the writer will starve — there is always one token available for every
122+
// reader.
123+
break
124+
}
125+
126+
s.cur += w.n
127+
s.waiters.Remove(next)
128+
close(w.ready)
129+
}
130+
s.mu.Unlock()
131+
}

semaphore/semaphore_bench_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright 2017 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// +build go1.7
6+
7+
package semaphore
8+
9+
import (
10+
"fmt"
11+
"testing"
12+
13+
"golang.org/x/net/context"
14+
)
15+
16+
// weighted is an interface matching a subset of *Weighted. It allows
17+
// alternate implementations for testing and benchmarking.
18+
type weighted interface {
19+
Acquire(context.Context, int64) error
20+
TryAcquire(int64) bool
21+
Release(int64)
22+
}
23+
24+
// semChan implements Weighted using a channel for
25+
// comparing against the condition variable-based implementation.
26+
type semChan chan struct{}
27+
28+
func newSemChan(n int64) semChan {
29+
return semChan(make(chan struct{}, n))
30+
}
31+
32+
func (s semChan) Acquire(_ context.Context, n int64) error {
33+
for i := int64(0); i < n; i++ {
34+
s <- struct{}{}
35+
}
36+
return nil
37+
}
38+
39+
func (s semChan) TryAcquire(n int64) bool {
40+
if int64(len(s))+n > int64(cap(s)) {
41+
return false
42+
}
43+
44+
for i := int64(0); i < n; i++ {
45+
s <- struct{}{}
46+
}
47+
return true
48+
}
49+
50+
func (s semChan) Release(n int64) {
51+
for i := int64(0); i < n; i++ {
52+
<-s
53+
}
54+
}
55+
56+
// acquireN calls Acquire(size) on sem N times and then calls Release(size) N times.
57+
func acquireN(b *testing.B, sem weighted, size int64, N int) {
58+
b.ResetTimer()
59+
for i := 0; i < b.N; i++ {
60+
for j := 0; j < N; j++ {
61+
sem.Acquire(context.Background(), size)
62+
}
63+
for j := 0; j < N; j++ {
64+
sem.Release(size)
65+
}
66+
}
67+
}
68+
69+
// tryAcquireN calls TryAcquire(size) on sem N times and then calls Release(size) N times.
70+
func tryAcquireN(b *testing.B, sem weighted, size int64, N int) {
71+
b.ResetTimer()
72+
for i := 0; i < b.N; i++ {
73+
for j := 0; j < N; j++ {
74+
if !sem.TryAcquire(size) {
75+
b.Fatalf("TryAcquire(%v) = false, want true", size)
76+
}
77+
}
78+
for j := 0; j < N; j++ {
79+
sem.Release(size)
80+
}
81+
}
82+
}
83+
84+
func BenchmarkNewSeq(b *testing.B) {
85+
for _, cap := range []int64{1, 128} {
86+
b.Run(fmt.Sprintf("Weighted-%d", cap), func(b *testing.B) {
87+
for i := 0; i < b.N; i++ {
88+
_ = NewWeighted(cap)
89+
}
90+
})
91+
b.Run(fmt.Sprintf("semChan-%d", cap), func(b *testing.B) {
92+
for i := 0; i < b.N; i++ {
93+
_ = newSemChan(cap)
94+
}
95+
})
96+
}
97+
}
98+
99+
func BenchmarkAcquireSeq(b *testing.B) {
100+
for _, c := range []struct {
101+
cap, size int64
102+
N int
103+
}{
104+
{1, 1, 1},
105+
{2, 1, 1},
106+
{16, 1, 1},
107+
{128, 1, 1},
108+
{2, 2, 1},
109+
{16, 2, 8},
110+
{128, 2, 64},
111+
{2, 1, 2},
112+
{16, 8, 2},
113+
{128, 64, 2},
114+
} {
115+
for _, w := range []struct {
116+
name string
117+
w weighted
118+
}{
119+
{"Weighted", NewWeighted(c.cap)},
120+
{"semChan", newSemChan(c.cap)},
121+
} {
122+
b.Run(fmt.Sprintf("%s-acquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) {
123+
acquireN(b, w.w, c.size, c.N)
124+
})
125+
b.Run(fmt.Sprintf("%s-tryAcquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) {
126+
tryAcquireN(b, w.w, c.size, c.N)
127+
})
128+
}
129+
}
130+
}

0 commit comments

Comments
 (0)