Skip to content

feat: add rate limit support #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/linuxsuren/api-testing/pkg/limit"
"github.com/linuxsuren/api-testing/pkg/render"
"github.com/linuxsuren/api-testing/pkg/runner"
"github.com/linuxsuren/api-testing/pkg/testing"
Expand All @@ -23,6 +24,10 @@ type runOption struct {
requestIgnoreError bool
thread int64
context context.Context
qps int32
burst int32
limiter limit.RateLimiter
startTime time.Time
}

// CreateRunCommand returns the run command
Expand All @@ -45,12 +50,20 @@ See also https://github.com/LinuxSuRen/api-testing/tree/master/sample`,
flags.DurationVarP(&opt.requestTimeout, "request-timeout", "", time.Minute, "Timeout for per request")
flags.BoolVarP(&opt.requestIgnoreError, "request-ignore-error", "", false, "Indicate if ignore the request error")
flags.Int64VarP(&opt.thread, "thread", "", 1, "Threads of the execution")
flags.Int32VarP(&opt.qps, "qps", "", 5, "QPS")
flags.Int32VarP(&opt.burst, "burst", "", 5, "burst")
return
}

func (o *runOption) runE(cmd *cobra.Command, args []string) (err error) {
var files []string
o.startTime = time.Now()
o.context = cmd.Context()
o.limiter = limit.NewDefaultRateLimiter(o.qps, o.burst)
defer func() {
cmd.Printf("consume: %s\n", time.Now().Sub(o.startTime).String())
o.limiter.Stop()
}()

if files, err = filepath.Glob(o.pattern); err == nil {
for i := range files {
Expand All @@ -74,12 +87,14 @@ func (o *runOption) runSuiteWithDuration(suite string) (err error) {
timeout = time.NewTicker(time.Second)
}
errChannel := make(chan error, 10*o.thread)
stopSingal := make(chan struct{}, 1)
var wait sync.WaitGroup

for !stop {
select {
case <-timeout.C:
stop = true
stopSingal <- struct{}{}
case err = <-errChannel:
if err != nil {
stop = true
Expand All @@ -89,9 +104,6 @@ func (o *runOption) runSuiteWithDuration(suite string) (err error) {
continue
}
wait.Add(1)
if o.duration <= 0 {
stop = true
}

go func(ch chan error, sem *semaphore.Weighted) {
now := time.Now()
Expand All @@ -102,16 +114,24 @@ func (o *runOption) runSuiteWithDuration(suite string) (err error) {
}()

dataContext := getDefaultContext()
ch <- o.runSuite(suite, dataContext, o.context)
ch <- o.runSuite(suite, dataContext, o.context, stopSingal)
}(errChannel, sem)
if o.duration <= 0 {
stop = true
}
}
}
err = <-errChannel

select {
case err = <-errChannel:
case <-stopSingal:
}

wait.Wait()
return
}

func (o *runOption) runSuite(suite string, dataContext map[string]interface{}, ctx context.Context) (err error) {
func (o *runOption) runSuite(suite string, dataContext map[string]interface{}, ctx context.Context, stopSingal chan struct{}) (err error) {
var testSuite *testing.TestSuite
if testSuite, err = testing.Parse(suite); err != nil {
return
Expand All @@ -131,11 +151,23 @@ func (o *runOption) runSuite(suite string, dataContext map[string]interface{}, c
testCase.Request.API = fmt.Sprintf("%s%s", testSuite.API, testCase.Request.API)
}

setRelativeDir(suite, &testCase)
var output interface{}
ctxWithTimeout, _ := context.WithTimeout(ctx, o.requestTimeout)
if output, err = runner.RunTestCase(&testCase, dataContext, ctxWithTimeout); err != nil && !o.requestIgnoreError {
select {
case <-stopSingal:
return
default:
// reuse the API prefix
if strings.HasPrefix(testCase.Request.API, "/") {
testCase.Request.API = fmt.Sprintf("%s%s", testSuite.API, testCase.Request.API)
}

setRelativeDir(suite, &testCase)
o.limiter.Accept()

ctxWithTimeout, _ := context.WithTimeout(ctx, o.requestTimeout)
if output, err = runner.RunTestCase(&testCase, dataContext, ctxWithTimeout); err != nil && !o.requestIgnoreError {
return
}
}
dataContext[testCase.Name] = output
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/h2non/gock"
"github.com/linuxsuren/api-testing/pkg/limit"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -48,9 +49,13 @@ func TestRunSuite(t *testing.T) {

tt.prepare()
ctx := getDefaultContext()
opt := &runOption{requestTimeout: 30 * time.Second}
opt := &runOption{
requestTimeout: 30 * time.Second,
limiter: limit.NewDefaultRateLimiter(0, 0),
}
stopSingal := make(chan struct{}, 1)

err := opt.runSuite(tt.suiteFile, ctx, context.TODO())
err := opt.runSuite(tt.suiteFile, ctx, context.TODO(), stopSingal)
assert.Equal(t, tt.hasError, err != nil, err)
})
}
Expand Down
95 changes: 95 additions & 0 deletions pkg/limit/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package limit

import (
"sync"
"time"
)

type RateLimiter interface {
TryAccept() bool
Accept()
Stop()
Burst() int32
}

type defaultRateLimiter struct {
qps int32
burst int32
lastToken time.Time
singal chan struct{}
mu sync.Mutex
}

func NewDefaultRateLimiter(qps, burst int32) RateLimiter {
if qps <= 0 {
qps = 5
}
if burst <= 0 {
burst = 5
}
limiter := &defaultRateLimiter{
qps: qps,
burst: burst,
singal: make(chan struct{}, 1),
}
go limiter.updateBurst()
return limiter
}

func (r *defaultRateLimiter) TryAccept() bool {
_, ok := r.resver()
return ok
}

func (r *defaultRateLimiter) resver() (delay time.Duration, ok bool) {
delay = time.Now().Sub(r.lastToken) / time.Millisecond
r.lastToken = time.Now()
if delay > 0 {
ok = true
} else if r.Burst() > 0 {
r.Setburst(r.Burst() - 1)
ok = true
} else {
delay = time.Second / time.Duration(r.qps)
}
return
}

func (r *defaultRateLimiter) Accept() {
delay, ok := r.resver()
if ok {
return
}

if delay > 0 {
time.Sleep(delay)
}
return
}

func (r *defaultRateLimiter) Setburst(burst int32) {
r.mu.Lock()
defer r.mu.Unlock()
r.burst = burst
}

func (r *defaultRateLimiter) Burst() int32 {
r.mu.Lock()
defer r.mu.Unlock()
return r.burst
}

func (r *defaultRateLimiter) Stop() {
r.singal <- struct{}{}
}

func (r *defaultRateLimiter) updateBurst() {
for {
select {
case <-time.After(time.Second):
r.Setburst(r.Burst() + r.qps)
case <-r.singal:
return
}
}
}
27 changes: 27 additions & 0 deletions pkg/limit/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package limit

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestXxx(t *testing.T) {
limiter := NewDefaultRateLimiter(1, 1)
num := 0

loop := true
go func(l RateLimiter) {
for loop {
l.Accept()
num += 1
}
}(limiter)

select {
case <-time.After(time.Second):
loop = false
}
assert.True(t, num <= 10)
}