File tree Expand file tree Collapse file tree 3 files changed +102
-0
lines changed Expand file tree Collapse file tree 3 files changed +102
-0
lines changed Original file line number Diff line number Diff line change 1+ module example_01
2+
3+ go 1.18
4+
5+ require (
6+ github.com/golang-queue/queue v0.1.3
7+ github.com/golang-queue/rabbitmq v0.0.2-0.20210822122542-200fdcf19ebf
8+ )
9+
10+ require (
11+ github.com/goccy/go-json v0.9.7 // indirect
12+ github.com/rabbitmq/amqp091-go v1.3.4 // indirect
13+ )
14+
15+ replace github.com/golang-queue/rabbitmq => ../../
Original file line number Diff line number Diff line change 1+ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c =
2+ github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM =
3+ github.com/goccy/go-json v0.9.7 /go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I =
4+ github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40 =
5+ github.com/golang-queue/queue v0.1.3 /go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos =
6+ github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc =
7+ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM =
8+ github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU =
9+ github.com/rabbitmq/amqp091-go v1.3.4 /go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM =
10+ github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+I =
11+ go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA =
12+ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA =
Original file line number Diff line number Diff line change 1+ package main
2+
3+ import (
4+ "context"
5+ "encoding/json"
6+ "fmt"
7+ "log"
8+ "time"
9+
10+ "github.com/golang-queue/queue"
11+ "github.com/golang-queue/queue/core"
12+ "github.com/golang-queue/rabbitmq"
13+ )
14+
15+ type job struct {
16+ Message string
17+ }
18+
19+ func (j * job ) Bytes () []byte {
20+ b , err := json .Marshal (j )
21+ if err != nil {
22+ panic (err )
23+ }
24+ return b
25+ }
26+
27+ func main () {
28+ taskN := 100
29+ rets := make (chan string , taskN )
30+
31+ // define the worker
32+ w := rabbitmq .NewWorker (
33+ rabbitmq .WithSubj ("sample_1" ),
34+ rabbitmq .WithRunFunc (func (ctx context.Context , m core.QueuedMessage ) error {
35+ var v * job
36+ if err := json .Unmarshal (m .Bytes (), & v ); err != nil {
37+ return err
38+ }
39+ rets <- v .Message
40+ return nil
41+ }),
42+ )
43+
44+ // define the queue
45+ q , err := queue .NewQueue (
46+ queue .WithWorkerCount (10 ),
47+ queue .WithWorker (w ),
48+ )
49+ if err != nil {
50+ log .Fatal (err )
51+ }
52+
53+ // start the five worker
54+ q .Start ()
55+
56+ // assign tasks in queue
57+ for i := 0 ; i < taskN ; i ++ {
58+ go func (i int ) {
59+ if err := q .Queue (& job {
60+ Message : fmt .Sprintf ("handle the job: %d" , i + 1 ),
61+ }); err != nil {
62+ log .Fatal (err )
63+ }
64+ }(i )
65+ }
66+
67+ // wait until all tasks done
68+ for i := 0 ; i < taskN ; i ++ {
69+ fmt .Println ("message:" , <- rets )
70+ time .Sleep (50 * time .Millisecond )
71+ }
72+
73+ // shutdown the service and notify all the worker
74+ q .Release ()
75+ }
You can’t perform that action at this time.
0 commit comments