|
16 | 16 |
|
17 | 17 | import com.rabbitmq.client.*; |
18 | 18 | import com.rabbitmq.client.observation.ObservationCollector; |
| 19 | +import io.micrometer.common.KeyValues; |
19 | 20 | import io.micrometer.observation.Observation; |
20 | 21 | import io.micrometer.observation.ObservationRegistry; |
21 | 22 | import java.io.IOException; |
@@ -56,8 +57,6 @@ public void publish( |
56 | 57 | byte[] body, |
57 | 58 | ConnectionInfo connectionInfo) |
58 | 59 | throws IOException { |
59 | | - // TODO: Is this for fire and forget or request reply too? If r-r then we have to have 2 |
60 | | - // contexts |
61 | 60 | Map<String, Object> headers; |
62 | 61 | if (properties.getHeaders() == null) { |
63 | 62 | headers = new HashMap<>(); |
@@ -103,13 +102,14 @@ public Consumer basicConsume(String queue, String consumerTag, Consumer consumer |
103 | 102 | @Override |
104 | 103 | public GetResponse basicGet(BasicGetCall call, String queue) { |
105 | 104 | return Observation.createNotStarted("rabbitmq.receive", registry) |
| 105 | + .highCardinalityKeyValues( |
| 106 | + KeyValues.of( |
| 107 | + RabbitMqObservationDocumentation.LowCardinalityTags.MESSAGING_OPERATION.withValue( |
| 108 | + "receive"), |
| 109 | + RabbitMqObservationDocumentation.LowCardinalityTags.MESSAGING_SYSTEM.withValue( |
| 110 | + "rabbitmq"))) |
106 | 111 | .observe( |
107 | 112 | () -> { |
108 | | - try { |
109 | | - Thread.sleep(100); |
110 | | - } catch (InterruptedException e) { |
111 | | - throw new RuntimeException(e); |
112 | | - } |
113 | 113 | GetResponse response = call.get(); |
114 | 114 | if (response != null) { |
115 | 115 | Map<String, Object> headers; |
|
0 commit comments