@@ -14,34 +14,74 @@ func Example_direct_exchange() {
1414 m := mockMessage {
1515 Message : "foo" ,
1616 }
17- w := NewWorker (
17+ w1 := NewWorker (
1818 WithSubj ("direct_queue" ),
1919 WithExchangeName ("direct_exchange" ),
20- WithRoutingKey ( "direct_queue " ),
21- WithTag ( "direct_queue " ),
20+ WithExchangeType ( "direct " ),
21+ WithRoutingKey ( "direct_exchange " ),
2222 WithRunFunc (func (ctx context.Context , m core.QueuedMessage ) error {
23- fmt .Println ("get data:" , string (m .Bytes ()))
23+ fmt .Println ("worker01 get data:" , string (m .Bytes ()))
24+ time .Sleep (100 * time .Millisecond )
2425 return nil
2526 }),
2627 )
28+
29+ q1 , err := queue .NewQueue (
30+ queue .WithWorker (w1 ),
31+ )
32+ if err != nil {
33+ w1 .opts .logger .Error (err )
34+ }
35+ q1 .Start ()
36+
37+ w2 := NewWorker (
38+ WithSubj ("direct_queue" ),
39+ WithExchangeName ("direct_exchange" ),
40+ WithExchangeType ("direct" ),
41+ WithRoutingKey ("direct_exchange" ),
42+ WithRunFunc (func (ctx context.Context , m core.QueuedMessage ) error {
43+ fmt .Println ("worker02 get data:" , string (m .Bytes ()))
44+ time .Sleep (100 * time .Millisecond )
45+ return nil
46+ }),
47+ )
48+
49+ q2 , err := queue .NewQueue (
50+ queue .WithWorker (w2 ),
51+ )
52+ if err != nil {
53+ w2 .opts .logger .Error (err )
54+ }
55+ q2 .Start ()
56+
57+ w := NewWorker (
58+ WithExchangeName ("direct_exchange" ),
59+ WithExchangeType ("direct" ),
60+ WithRoutingKey ("direct_exchange" ),
61+ )
62+
2763 q , err := queue .NewQueue (
2864 queue .WithWorker (w ),
29- queue .WithWorkerCount (1 ),
3065 )
3166 if err != nil {
3267 w .opts .logger .Error (err )
3368 }
3469
35- q .Start ()
3670 time .Sleep (200 * time .Millisecond )
3771 q .Queue (m )
3872 q .Queue (m )
73+ q .Queue (m )
74+ q .Queue (m )
3975 time .Sleep (200 * time .Millisecond )
4076 q .Release ()
77+ q1 .Release ()
78+ q2 .Release ()
4179
42- // Output:
43- // get data: foo
44- // get data: foo
80+ // Unordered Output:
81+ // worker01 get data: foo
82+ // worker02 get data: foo
83+ // worker01 get data: foo
84+ // worker02 get data: foo
4585}
4686
4787// Fanout Exchange
@@ -65,9 +105,7 @@ func Example_fanout_exchange() {
65105 if err != nil {
66106 w1 .opts .logger .Error (err )
67107 }
68-
69108 q1 .Start ()
70- time .Sleep (200 * time .Millisecond )
71109
72110 w2 := NewWorker (
73111 WithSubj ("fanout_queue_2" ),
@@ -85,9 +123,7 @@ func Example_fanout_exchange() {
85123 if err != nil {
86124 w2 .opts .logger .Error (err )
87125 }
88-
89126 q2 .Start ()
90- time .Sleep (200 * time .Millisecond )
91127
92128 w := NewWorker (
93129 WithExchangeName ("fanout_exchange" ),
0 commit comments