Skip to content

Commit d50948e

Browse files
committed
refactor: [torrust#1598] make recalculate udp avg connect processing time metric and update atomic
It also fixes a division by zero bug when the metrics is updated before the counter for number of conenctions has been increased. It only avoid the division by zero. I will propoerly fixed with independent request counter for the moving average calculation.
1 parent e6c05b6 commit d50948e

File tree

3 files changed

+73
-34
lines changed

3 files changed

+73
-34
lines changed

packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,13 @@ pub async fn handle_event(
1616
let (result_label_value, kind_label_value) = match kind {
1717
UdpResponseKind::Ok { req_kind } => match req_kind {
1818
UdpRequestKind::Connect => {
19-
let new_avg = stats_repository
20-
.recalculate_udp_avg_connect_processing_time_ns(req_processing_time)
21-
.await;
22-
23-
tracing::debug!("Updating average processing time metric for connect requests: {} ns", new_avg);
24-
2519
let mut label_set = LabelSet::from(context.clone());
2620
label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string()));
27-
match stats_repository
28-
.set_gauge(
29-
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS),
30-
&label_set,
31-
new_avg,
32-
now,
33-
)
34-
.await
35-
{
36-
Ok(()) => {}
37-
Err(err) => tracing::error!("Failed to set gauge: {}", err),
38-
}
21+
22+
let _new_avg = stats_repository
23+
.recalculate_udp_avg_connect_processing_time_ns(req_processing_time, &label_set, now)
24+
.await;
25+
3926
(LabelValue::new("ok"), UdpRequestKind::Connect.into())
4027
}
4128
UdpRequestKind::Announce { announce_request } => {

packages/udp-tracker-server/src/statistics/metrics.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,23 @@ impl Metrics {
5151

5252
impl Metrics {
5353
#[allow(clippy::cast_precision_loss)]
54-
pub fn recalculate_udp_avg_connect_processing_time_ns(&self, req_processing_time: Duration) -> f64 {
54+
pub fn recalculate_udp_avg_connect_processing_time_ns(
55+
&mut self,
56+
req_processing_time: Duration,
57+
label_set: &LabelSet,
58+
now: DurationSinceUnixEpoch,
59+
) -> f64 {
5560
let req_processing_time = req_processing_time.as_nanos() as f64;
5661
let udp_connections_handled = (self.udp4_connections_handled() + self.udp6_connections_handled()) as f64;
5762

5863
let previous_avg = self.udp_avg_connect_processing_time_ns();
5964

60-
// Moving average: https://en.wikipedia.org/wiki/Moving_average
61-
let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_connections_handled;
65+
let new_avg = if udp_connections_handled == 0.0 {
66+
req_processing_time
67+
} else {
68+
// Moving average: https://en.wikipedia.org/wiki/Moving_average
69+
previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_connections_handled
70+
};
6271

6372
tracing::debug!(
6473
"Recalculated UDP average connect processing time: {} ns (previous: {} ns, req_processing_time: {} ns, udp_connections_handled: {})",
@@ -68,9 +77,25 @@ impl Metrics {
6877
udp_connections_handled
6978
);
7079

80+
self.update_udp_avg_connect_processing_time_ns(new_avg, label_set, now);
81+
7182
new_avg
7283
}
7384

85+
fn update_udp_avg_connect_processing_time_ns(&mut self, new_avg: f64, label_set: &LabelSet, now: DurationSinceUnixEpoch) {
86+
tracing::debug!("Updating average processing time metric for connect requests: {} ns", new_avg);
87+
88+
match self.set_gauge(
89+
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS),
90+
label_set,
91+
new_avg,
92+
now,
93+
) {
94+
Ok(()) => {}
95+
Err(err) => tracing::error!("Failed to set gauge: {}", err),
96+
}
97+
}
98+
7499
#[allow(clippy::cast_precision_loss)]
75100
pub fn recalculate_udp_avg_announce_processing_time_ns(&self, req_processing_time: Duration) -> f64 {
76101
let req_processing_time = req_processing_time.as_nanos() as f64;

packages/udp-tracker-server/src/statistics/repository.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,15 @@ impl Repository {
7373
result
7474
}
7575

76-
pub async fn recalculate_udp_avg_connect_processing_time_ns(&self, req_processing_time: Duration) -> f64 {
77-
let stats_lock = self.stats.write().await;
76+
pub async fn recalculate_udp_avg_connect_processing_time_ns(
77+
&self,
78+
req_processing_time: Duration,
79+
label_set: &LabelSet,
80+
now: DurationSinceUnixEpoch,
81+
) -> f64 {
82+
let mut stats_lock = self.stats.write().await;
7883

79-
let new_avg = stats_lock.recalculate_udp_avg_connect_processing_time_ns(req_processing_time);
84+
let new_avg = stats_lock.recalculate_udp_avg_connect_processing_time_ns(req_processing_time, label_set, now);
8085

8186
drop(stats_lock);
8287

@@ -338,7 +343,9 @@ mod tests {
338343

339344
// Calculate new average with processing time of 2000ns
340345
let processing_time = Duration::from_nanos(2000);
341-
let new_avg = repo.recalculate_udp_avg_connect_processing_time_ns(processing_time).await;
346+
let new_avg = repo
347+
.recalculate_udp_avg_connect_processing_time_ns(processing_time, &connect_labels, now)
348+
.await;
342349

343350
// Moving average: previous_avg + (new_value - previous_avg) / total_connections
344351
// 1000 + (2000 - 1000) / 3 = 1000 + 333.33 = 1333.33
@@ -436,17 +443,25 @@ mod tests {
436443
#[tokio::test]
437444
async fn recalculate_average_methods_should_handle_zero_connections_gracefully() {
438445
let repo = Repository::new();
446+
let now = CurrentClock::now();
439447

440448
// Test with zero connections (should not panic, should handle division by zero)
441449
let processing_time = Duration::from_nanos(1000);
442450

443-
let connect_avg = repo.recalculate_udp_avg_connect_processing_time_ns(processing_time).await;
451+
let connect_labels = LabelSet::from([("request_kind", "connect")]);
452+
let connect_avg = repo
453+
.recalculate_udp_avg_connect_processing_time_ns(processing_time, &connect_labels, now)
454+
.await;
455+
456+
let _announce_labels = LabelSet::from([("request_kind", "announce")]);
444457
let announce_avg = repo.recalculate_udp_avg_announce_processing_time_ns(processing_time).await;
458+
459+
let _scrape_labels = LabelSet::from([("request_kind", "scrape")]);
445460
let scrape_avg = repo.recalculate_udp_avg_scrape_processing_time_ns(processing_time).await;
446461

447462
// With 0 total connections, the formula becomes 0 + (1000 - 0) / 0
448463
// This should handle the division by zero case gracefully
449-
assert!(connect_avg.is_infinite() || connect_avg.is_nan());
464+
assert!((connect_avg - 1000.0).abs() < f64::EPSILON);
450465
assert!(announce_avg.is_infinite() || announce_avg.is_nan());
451466
assert!(scrape_avg.is_infinite() || scrape_avg.is_nan());
452467
}
@@ -500,7 +515,10 @@ mod tests {
500515

501516
// Test with very large processing time
502517
let large_duration = Duration::from_secs(1); // 1 second = 1,000,000,000 ns
503-
let new_avg = repo.recalculate_udp_avg_connect_processing_time_ns(large_duration).await;
518+
let connect_labels = LabelSet::from([("request_kind", "connect")]);
519+
let new_avg = repo
520+
.recalculate_udp_avg_connect_processing_time_ns(large_duration, &connect_labels, now)
521+
.await;
504522

505523
// Should handle large numbers without overflow
506524
assert!(new_avg > 0.0);
@@ -575,6 +593,7 @@ mod tests {
575593
#[tokio::test]
576594
async fn it_should_handle_moving_average_calculation_before_any_connections_are_recorded() {
577595
let repo = Repository::new();
596+
let connect_labels = LabelSet::from([("request_kind", "connect")]);
578597
let now = CurrentClock::now();
579598

580599
// This test checks the behavior of `recalculate_udp_avg_connect_processing_time_ns``
@@ -591,12 +610,13 @@ mod tests {
591610

592611
// First calculation: no connections recorded yet, should result in infinity
593612
let processing_time_1 = Duration::from_nanos(2000);
594-
let avg_1 = repo.recalculate_udp_avg_connect_processing_time_ns(processing_time_1).await;
613+
let avg_1 = repo
614+
.recalculate_udp_avg_connect_processing_time_ns(processing_time_1, &connect_labels, now)
615+
.await;
595616

596-
// Division by zero: 1000 + (2000 - 1000) / 0 = infinity
597617
assert!(
598-
avg_1.is_infinite(),
599-
"First calculation should be infinite due to division by zero"
618+
(avg_1 - 2000.0).abs() < f64::EPSILON,
619+
"First calculation should be 2000, but got {avg_1}"
600620
);
601621

602622
// Now add one connection and try again
@@ -605,10 +625,17 @@ mod tests {
605625
.await
606626
.unwrap();
607627

608-
// Second calculation: 1 connection, but previous average is infinity
628+
// Second calculation: 1 connection
609629
let processing_time_2 = Duration::from_nanos(3000);
610-
let avg_2 = repo.recalculate_udp_avg_connect_processing_time_ns(processing_time_2).await;
630+
let connect_labels = LabelSet::from([("request_kind", "connect")]);
631+
let avg_2 = repo
632+
.recalculate_udp_avg_connect_processing_time_ns(processing_time_2, &connect_labels, now)
633+
.await;
611634

635+
// There is one connection, so the average should be:
636+
// 2000 + (3000 - 2000) / 1 = 2000 + 1000 = 3000
637+
// This is because one connection is not counted yet in the average calculation,
638+
// so the average is simply the processing time of the second connection.
612639
assert!(
613640
(avg_2 - 3000.0).abs() < f64::EPSILON,
614641
"Second calculation should be 3000ns, but got {avg_2}"

0 commit comments

Comments
 (0)