Skip to content

Commit 4a4554d

Browse files
authored
feat: support to set the request timeout (#15) (#16)
Co-authored-by: rick <[email protected]>
1 parent 3500832 commit 4a4554d

File tree

4 files changed

+170
-11
lines changed

4 files changed

+170
-11
lines changed

cmd/run.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync"
1010
"time"
1111

12+
"github.com/linuxsuren/api-testing/pkg/limit"
1213
"github.com/linuxsuren/api-testing/pkg/render"
1314
"github.com/linuxsuren/api-testing/pkg/runner"
1415
"github.com/linuxsuren/api-testing/pkg/testing"
@@ -23,6 +24,10 @@ type runOption struct {
2324
requestIgnoreError bool
2425
thread int64
2526
context context.Context
27+
qps int32
28+
burst int32
29+
limiter limit.RateLimiter
30+
startTime time.Time
2631
}
2732

2833
// CreateRunCommand returns the run command
@@ -45,12 +50,20 @@ See also https://github.com/LinuxSuRen/api-testing/tree/master/sample`,
4550
flags.DurationVarP(&opt.requestTimeout, "request-timeout", "", time.Minute, "Timeout for per request")
4651
flags.BoolVarP(&opt.requestIgnoreError, "request-ignore-error", "", false, "Indicate if ignore the request error")
4752
flags.Int64VarP(&opt.thread, "thread", "", 1, "Threads of the execution")
53+
flags.Int32VarP(&opt.qps, "qps", "", 5, "QPS")
54+
flags.Int32VarP(&opt.burst, "burst", "", 5, "burst")
4855
return
4956
}
5057

5158
func (o *runOption) runE(cmd *cobra.Command, args []string) (err error) {
5259
var files []string
60+
o.startTime = time.Now()
5361
o.context = cmd.Context()
62+
o.limiter = limit.NewDefaultRateLimiter(o.qps, o.burst)
63+
defer func() {
64+
cmd.Printf("consume: %s\n", time.Now().Sub(o.startTime).String())
65+
o.limiter.Stop()
66+
}()
5467

5568
if files, err = filepath.Glob(o.pattern); err == nil {
5669
for i := range files {
@@ -74,12 +87,14 @@ func (o *runOption) runSuiteWithDuration(suite string) (err error) {
7487
timeout = time.NewTicker(time.Second)
7588
}
7689
errChannel := make(chan error, 10*o.thread)
90+
stopSingal := make(chan struct{}, 1)
7791
var wait sync.WaitGroup
7892

7993
for !stop {
8094
select {
8195
case <-timeout.C:
8296
stop = true
97+
stopSingal <- struct{}{}
8398
case err = <-errChannel:
8499
if err != nil {
85100
stop = true
@@ -89,9 +104,6 @@ func (o *runOption) runSuiteWithDuration(suite string) (err error) {
89104
continue
90105
}
91106
wait.Add(1)
92-
if o.duration <= 0 {
93-
stop = true
94-
}
95107

96108
go func(ch chan error, sem *semaphore.Weighted) {
97109
now := time.Now()
@@ -102,16 +114,24 @@ func (o *runOption) runSuiteWithDuration(suite string) (err error) {
102114
}()
103115

104116
dataContext := getDefaultContext()
105-
ch <- o.runSuite(suite, dataContext, o.context)
117+
ch <- o.runSuite(suite, dataContext, o.context, stopSingal)
106118
}(errChannel, sem)
119+
if o.duration <= 0 {
120+
stop = true
121+
}
107122
}
108123
}
109-
err = <-errChannel
124+
125+
select {
126+
case err = <-errChannel:
127+
case <-stopSingal:
128+
}
129+
110130
wait.Wait()
111131
return
112132
}
113133

114-
func (o *runOption) runSuite(suite string, dataContext map[string]interface{}, ctx context.Context) (err error) {
134+
func (o *runOption) runSuite(suite string, dataContext map[string]interface{}, ctx context.Context, stopSingal chan struct{}) (err error) {
115135
var testSuite *testing.TestSuite
116136
if testSuite, err = testing.Parse(suite); err != nil {
117137
return
@@ -131,11 +151,23 @@ func (o *runOption) runSuite(suite string, dataContext map[string]interface{}, c
131151
testCase.Request.API = fmt.Sprintf("%s%s", testSuite.API, testCase.Request.API)
132152
}
133153

134-
setRelativeDir(suite, &testCase)
135154
var output interface{}
136-
ctxWithTimeout, _ := context.WithTimeout(ctx, o.requestTimeout)
137-
if output, err = runner.RunTestCase(&testCase, dataContext, ctxWithTimeout); err != nil && !o.requestIgnoreError {
155+
select {
156+
case <-stopSingal:
138157
return
158+
default:
159+
// reuse the API prefix
160+
if strings.HasPrefix(testCase.Request.API, "/") {
161+
testCase.Request.API = fmt.Sprintf("%s%s", testSuite.API, testCase.Request.API)
162+
}
163+
164+
setRelativeDir(suite, &testCase)
165+
o.limiter.Accept()
166+
167+
ctxWithTimeout, _ := context.WithTimeout(ctx, o.requestTimeout)
168+
if output, err = runner.RunTestCase(&testCase, dataContext, ctxWithTimeout); err != nil && !o.requestIgnoreError {
169+
return
170+
}
139171
}
140172
dataContext[testCase.Name] = output
141173
}

cmd/run_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/h2non/gock"
10+
"github.com/linuxsuren/api-testing/pkg/limit"
1011
"github.com/spf13/cobra"
1112
"github.com/stretchr/testify/assert"
1213
)
@@ -48,9 +49,13 @@ func TestRunSuite(t *testing.T) {
4849

4950
tt.prepare()
5051
ctx := getDefaultContext()
51-
opt := &runOption{requestTimeout: 30 * time.Second}
52+
opt := &runOption{
53+
requestTimeout: 30 * time.Second,
54+
limiter: limit.NewDefaultRateLimiter(0, 0),
55+
}
56+
stopSingal := make(chan struct{}, 1)
5257

53-
err := opt.runSuite(tt.suiteFile, ctx, context.TODO())
58+
err := opt.runSuite(tt.suiteFile, ctx, context.TODO(), stopSingal)
5459
assert.Equal(t, tt.hasError, err != nil, err)
5560
})
5661
}

pkg/limit/limiter.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package limit
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
type RateLimiter interface {
9+
TryAccept() bool
10+
Accept()
11+
Stop()
12+
Burst() int32
13+
}
14+
15+
type defaultRateLimiter struct {
16+
qps int32
17+
burst int32
18+
lastToken time.Time
19+
singal chan struct{}
20+
mu sync.Mutex
21+
}
22+
23+
func NewDefaultRateLimiter(qps, burst int32) RateLimiter {
24+
if qps <= 0 {
25+
qps = 5
26+
}
27+
if burst <= 0 {
28+
burst = 5
29+
}
30+
limiter := &defaultRateLimiter{
31+
qps: qps,
32+
burst: burst,
33+
singal: make(chan struct{}, 1),
34+
}
35+
go limiter.updateBurst()
36+
return limiter
37+
}
38+
39+
func (r *defaultRateLimiter) TryAccept() bool {
40+
_, ok := r.resver()
41+
return ok
42+
}
43+
44+
func (r *defaultRateLimiter) resver() (delay time.Duration, ok bool) {
45+
delay = time.Now().Sub(r.lastToken) / time.Millisecond
46+
r.lastToken = time.Now()
47+
if delay > 0 {
48+
ok = true
49+
} else if r.Burst() > 0 {
50+
r.Setburst(r.Burst() - 1)
51+
ok = true
52+
} else {
53+
delay = time.Second / time.Duration(r.qps)
54+
}
55+
return
56+
}
57+
58+
func (r *defaultRateLimiter) Accept() {
59+
delay, ok := r.resver()
60+
if ok {
61+
return
62+
}
63+
64+
if delay > 0 {
65+
time.Sleep(delay)
66+
}
67+
return
68+
}
69+
70+
func (r *defaultRateLimiter) Setburst(burst int32) {
71+
r.mu.Lock()
72+
defer r.mu.Unlock()
73+
r.burst = burst
74+
}
75+
76+
func (r *defaultRateLimiter) Burst() int32 {
77+
r.mu.Lock()
78+
defer r.mu.Unlock()
79+
return r.burst
80+
}
81+
82+
func (r *defaultRateLimiter) Stop() {
83+
r.singal <- struct{}{}
84+
}
85+
86+
func (r *defaultRateLimiter) updateBurst() {
87+
for {
88+
select {
89+
case <-time.After(time.Second):
90+
r.Setburst(r.Burst() + r.qps)
91+
case <-r.singal:
92+
return
93+
}
94+
}
95+
}

pkg/limit/limiter_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package limit
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestXxx(t *testing.T) {
11+
limiter := NewDefaultRateLimiter(1, 1)
12+
num := 0
13+
14+
loop := true
15+
go func(l RateLimiter) {
16+
for loop {
17+
l.Accept()
18+
num += 1
19+
}
20+
}(limiter)
21+
22+
select {
23+
case <-time.After(time.Second):
24+
loop = false
25+
}
26+
assert.True(t, num <= 10)
27+
}

0 commit comments

Comments
 (0)