Skip to content

Commit 7958273

Browse files
authored
ref(kafka): Add producer_name tag, and more rdkafka stats (#5031)
ref STREAM-324 * The producer_name tag identifies a particular producer instance in Relay. It's almost identical to broker_name, but broker_name is not generally available because in theory one producer can talk to many brokers (though relay doesn't do that), and some statistics are related to the producer's internal datastructures and don't have that tag. Also, we're going to add this tag to other producers in other services (and we have _many_ producers in getsentry talking to a single broker there), so it pays off to be consistent. With a bit of luck we might even have a proper generic dashboard for all of our producers in our entire infrastructure.. someday. * Add a few metrics that we think might be useful to see if Relay is impacted by a broker being unhealthy. Aside, it would be really nice if Relay moved to rust-arroyo for producing. I'm implementing the same metrics there. Or maybe not its entire producer wrapper but at least reusing its stats callback.
1 parent 7ec5087 commit 7958273

File tree

4 files changed

+263
-46
lines changed

4 files changed

+263
-46
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
- Add Jwm to the supported image types. ([#4975](https://github.com/getsentry/relay/pull/4975))
3333
- Process logs in all non-proxy Relays. ([#4973](https://github.com/getsentry/relay/pull/4973))
3434
- Add support for pre-hashed signatures. ([#5012](https://github.com/getsentry/relay/pull/5012))
35+
- Add producer_name tag, and more rdkafka stats. ([#5031](https://github.com/getsentry/relay/pull/5031))
3536

3637
## 25.7.0
3738

relay-kafka/src/producer/mod.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,13 @@ impl Producer {
214214
return Err(ClientError::MissingTopic);
215215
};
216216

217+
let producer_name = producer.context().producer_name();
218+
217219
metric!(
218220
histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
219221
variant = variant,
220222
topic = topic_name,
223+
producer_name = producer_name
221224
);
222225

223226
let mut headers = headers
@@ -240,6 +243,7 @@ impl Producer {
240243
counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
241244
variant = variant,
242245
topic = topic_name,
246+
producer_name = producer_name
243247
);
244248

245249
headers.insert(Header {
@@ -262,7 +266,8 @@ impl Producer {
262266
metric!(
263267
gauge(KafkaGauges::InFlightCount) = producer.in_flight_count() as u64,
264268
variant = variant,
265-
topic = topic_name
269+
topic = topic_name,
270+
producer_name = producer_name
266271
);
267272
});
268273

@@ -276,7 +281,8 @@ impl Producer {
276281
metric!(
277282
counter(KafkaCounters::ProducerEnqueueError) += 1,
278283
variant = variant,
279-
topic = topic_name
284+
topic = topic_name,
285+
producer_name = producer_name
280286
);
281287
ClientError::SendFailed(error)
282288
})?;
@@ -433,9 +439,17 @@ impl KafkaClientBuilder {
433439
client_config.set(config_p.name.as_str(), config_p.value.as_str());
434440
}
435441

442+
// Extract producer name from client.id, fallback to config name, then "unknown"
443+
let producer_name = config_params
444+
.iter()
445+
.find(|p| p.name == "client.id")
446+
.map(|p| p.value.clone())
447+
.or_else(|| config_name.clone())
448+
.unwrap_or_else(|| "unknown".to_owned());
449+
436450
let producer = Arc::new(
437451
client_config
438-
.create_with_context(Context)
452+
.create_with_context(Context::new(producer_name))
439453
.map_err(ClientError::InvalidConfig)?,
440454
);
441455

relay-kafka/src/producer/utils.rs

Lines changed: 103 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -74,51 +74,120 @@ where
7474

7575
/// Kafka client and producer context that logs statistics and producer errors.
7676
#[derive(Debug)]
77-
pub struct Context;
77+
pub struct Context {
78+
/// Producer name for deployment identification
79+
producer_name: String,
80+
}
81+
82+
impl Context {
83+
pub fn new(producer_name: String) -> Self {
84+
Self { producer_name }
85+
}
86+
87+
pub fn producer_name(&self) -> &str {
88+
&self.producer_name
89+
}
90+
}
7891

7992
impl ClientContext for Context {
8093
/// Report client statistics as statsd metrics.
8194
///
8295
/// This method is only called if `statistics.interval.ms` is configured.
8396
fn stats(&self, statistics: rdkafka::Statistics) {
84-
relay_statsd::metric!(gauge(KafkaGauges::MessageCount) = statistics.msg_cnt);
85-
relay_statsd::metric!(gauge(KafkaGauges::MessageCountMax) = statistics.msg_max);
86-
relay_statsd::metric!(gauge(KafkaGauges::MessageSize) = statistics.msg_size);
87-
relay_statsd::metric!(gauge(KafkaGauges::MessageSizeMax) = statistics.msg_size_max);
97+
let producer_name = &self.producer_name;
98+
99+
relay_statsd::metric!(
100+
gauge(KafkaGauges::MessageCount) = statistics.msg_cnt,
101+
producer_name = producer_name
102+
);
103+
relay_statsd::metric!(
104+
gauge(KafkaGauges::MessageCountMax) = statistics.msg_max,
105+
producer_name = producer_name
106+
);
107+
relay_statsd::metric!(
108+
gauge(KafkaGauges::MessageSize) = statistics.msg_size,
109+
producer_name = producer_name
110+
);
111+
relay_statsd::metric!(
112+
gauge(KafkaGauges::MessageSizeMax) = statistics.msg_size_max,
113+
producer_name = producer_name
114+
);
115+
relay_statsd::metric!(
116+
gauge(KafkaGauges::TxMsgs) = statistics.txmsgs as u64,
117+
producer_name = producer_name
118+
);
88119

89120
for (_, broker) in statistics.brokers {
90121
relay_statsd::metric!(
91122
gauge(KafkaGauges::OutboundBufferRequests) = broker.outbuf_cnt as u64,
92-
broker_name = &broker.name
123+
broker_name = &broker.name,
124+
producer_name = producer_name
93125
);
94126
relay_statsd::metric!(
95127
gauge(KafkaGauges::OutboundBufferMessages) = broker.outbuf_msg_cnt as u64,
96-
broker_name = &broker.name
128+
broker_name = &broker.name,
129+
producer_name = producer_name
97130
);
98131
if let Some(connects) = broker.connects {
99132
relay_statsd::metric!(
100133
gauge(KafkaGauges::Connects) = connects as u64,
101-
broker_name = &broker.name
134+
broker_name = &broker.name,
135+
producer_name = producer_name
102136
);
103137
}
104138
if let Some(disconnects) = broker.disconnects {
105139
relay_statsd::metric!(
106140
gauge(KafkaGauges::Disconnects) = disconnects as u64,
107-
broker_name = &broker.name
141+
broker_name = &broker.name,
142+
producer_name = producer_name
108143
);
109144
}
110145
if let Some(int_latency) = broker.int_latency {
111146
relay_statsd::metric!(
112-
gauge(KafkaGauges::ProducerQueueLatency) = int_latency.max as u64,
113-
broker_name = &broker.name
147+
gauge(KafkaGauges::BrokerIntLatencyAvg) = (int_latency.avg / 1000) as u64,
148+
broker_name = &broker.name,
149+
producer_name = producer_name
150+
);
151+
relay_statsd::metric!(
152+
gauge(KafkaGauges::BrokerIntLatencyP99) = (int_latency.p99 / 1000) as u64,
153+
broker_name = &broker.name,
154+
producer_name = producer_name
114155
);
115156
}
116157
if let Some(outbuf_latency) = broker.outbuf_latency {
117158
relay_statsd::metric!(
118-
gauge(KafkaGauges::RequestQueueLatency) = outbuf_latency.max as u64,
119-
broker_name = &broker.name
159+
gauge(KafkaGauges::BrokerOutbufLatencyAvg) = (outbuf_latency.avg / 1000) as u64,
160+
broker_name = &broker.name,
161+
producer_name = producer_name
162+
);
163+
relay_statsd::metric!(
164+
gauge(KafkaGauges::BrokerOutbufLatencyP99) = (outbuf_latency.p99 / 1000) as u64,
165+
broker_name = &broker.name,
166+
producer_name = producer_name
167+
);
168+
}
169+
if let Some(rtt) = broker.rtt {
170+
relay_statsd::metric!(
171+
gauge(KafkaGauges::BrokerRttAvg) = (rtt.avg / 1000) as u64,
172+
broker_name = &broker.name,
173+
producer_name = producer_name
174+
);
175+
relay_statsd::metric!(
176+
gauge(KafkaGauges::BrokerRttP99) = (rtt.p99 / 1000) as u64,
177+
broker_name = &broker.name,
178+
producer_name = producer_name
120179
);
121180
}
181+
relay_statsd::metric!(
182+
gauge(KafkaGauges::BrokerTx) = broker.tx,
183+
broker_name = &broker.name,
184+
producer_name = producer_name
185+
);
186+
relay_statsd::metric!(
187+
gauge(KafkaGauges::BrokerTxBytes) = broker.txbytes,
188+
broker_name = &broker.name,
189+
producer_name = producer_name
190+
);
122191
}
123192
}
124193
}
@@ -132,18 +201,28 @@ impl ProducerContext for Context {
132201
// TODO: any `Accepted` outcomes (e.g. spans) should be logged here instead of on the caller side,
133202
// such that we do not over-report in the error case.
134203

135-
if let Err((error, message)) = result {
136-
relay_log::error!(
137-
error = error as &dyn Error,
138-
payload_len = message.payload_len(),
139-
tags.topic = message.topic(),
140-
"failed to produce message to Kafka (delivery callback)",
141-
);
204+
match result {
205+
Ok(message) => {
206+
metric!(
207+
counter(KafkaCounters::ProduceStatusSuccess) += 1,
208+
topic = message.topic(),
209+
producer_name = &self.producer_name
210+
);
211+
}
212+
Err((error, message)) => {
213+
relay_log::error!(
214+
error = error as &dyn Error,
215+
payload_len = message.payload_len(),
216+
tags.topic = message.topic(),
217+
"failed to produce message to Kafka (delivery callback)",
218+
);
142219

143-
metric!(
144-
counter(KafkaCounters::ProcessingProduceError) += 1,
145-
topic = message.topic()
146-
);
220+
metric!(
221+
counter(KafkaCounters::ProduceStatusError) += 1,
222+
topic = message.topic(),
223+
producer_name = &self.producer_name
224+
);
225+
}
147226
}
148227
}
149228
}

0 commit comments

Comments
 (0)