diff --git a/go.mod b/go.mod index 33349b1..3c93251 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,16 @@ module github.com/golang-queue/nsq go 1.18 require ( - github.com/golang-queue/queue v0.1.0 + github.com/golang-queue/queue v0.1.3-0.20220624082030-00c8a316580c github.com/nsqio/go-nsq v1.1.0 - github.com/stretchr/testify v1.7.1 + github.com/stretchr/testify v1.7.3 go.uber.org/goleak v1.1.12 ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect - github.com/golang/snappy v0.0.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/goccy/go-json v0.9.7 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a83db8c..ea9b5b2 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,14 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang-queue/queue v0.1.0 h1:SVzDMgqjjb09tRkYCjeDHU5FyErFLR6lt0qbcw40Nx4= -github.com/golang-queue/queue v0.1.0/go.mod h1:g1yxxDl8JMo4gUfxt11fjjU3SXU1ah61EvwshmDoSIs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= +github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang-queue/queue v0.1.3-0.20220624082030-00c8a316580c h1:3ZQLDgYpsgzn3+ctrvMVN5Yf50LK0iMqcUlE498Gqm4= +github.com/golang-queue/queue v0.1.3-0.20220624082030-00c8a316580c/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -15,9 +19,11 @@ github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQT github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+I= +github.com/stretchr/testify v1.7.3/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= @@ -51,5 +57,6 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/nsq.go b/nsq.go index c3a2ed8..a7fae30 100644 --- a/nsq.go +++ b/nsq.go @@ -96,7 +96,7 @@ func (w *Worker) startConsumer() (err error) { return err } -func (w *Worker) handle(job queue.Job) error { +func (w *Worker) handle(job *queue.Job) error { // create channel with buffer size 1 to avoid goroutine leak done := make(chan error, 1) panicChan := make(chan interface{}, 1) @@ -145,7 +145,7 @@ func (w *Worker) handle(job queue.Job) error { // Run start the worker func (w *Worker) Run(task core.QueuedMessage) error { - data, _ := task.(queue.Job) + data, _ := task.(*queue.Job) if err := w.handle(data); err != nil { return err @@ -202,7 +202,7 @@ loop: } var data queue.Job _ = json.Unmarshal(task.Body, &data) - return data, nil + return &data, nil case <-time.After(1 * time.Second): if clock == 5 { break loop diff --git a/nsq_test.go b/nsq_test.go index bd38418..5490298 100644 --- a/nsq_test.go +++ b/nsq_test.go @@ -268,7 +268,7 @@ func TestGoroutinePanic(t *testing.T) { } func TestHandleTimeout(t *testing.T) { - job := queue.Job{ + job := &queue.Job{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -283,7 +283,7 @@ func TestHandleTimeout(t *testing.T) { assert.Error(t, err) assert.Equal(t, context.DeadlineExceeded, err) - job = queue.Job{ + job = &queue.Job{ Timeout: 150 * time.Millisecond, Payload: []byte("foo"), } @@ -308,7 +308,7 @@ func TestHandleTimeout(t *testing.T) { } func TestJobComplete(t *testing.T) { - job := queue.Job{ + job := &queue.Job{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -322,7 +322,7 @@ func TestJobComplete(t *testing.T) { assert.Error(t, err) assert.Equal(t, errors.New("job completed"), err) - job = queue.Job{ + job = &queue.Job{ Timeout: 250 * time.Millisecond, Payload: []byte("foo"), }