Skip to content

Commit cadedad

Browse files
Support for GET /api/queues/detailed
1 parent c74ff42 commit cadedad

File tree

7 files changed

+307
-13
lines changed

7 files changed

+307
-13
lines changed

src/api.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ use crate::requests::{
2020
FederationUpstreamParams, GlobalRuntimeParameterDefinition, SHOVEL_COMPONENT, StreamParams,
2121
};
2222
use crate::responses::{
23-
AuthenticationAttemptStatistics, ClusterTags, DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability,
24-
FeatureFlagState, FederationUpstream, GetMessage, OAuthConfiguration, Overview,
25-
SchemaDefinitionSyncStatus, VirtualHostDefinitionSet, WarmStandbyReplicationStatus,
23+
AuthenticationAttemptStatistics, ClusterTags, DeprecatedFeatureList, FeatureFlag,
24+
FeatureFlagList, FeatureFlagStability, FeatureFlagState, FederationUpstream, GetMessage,
25+
OAuthConfiguration, Overview, SchemaDefinitionSyncStatus, VirtualHostDefinitionSet,
26+
WarmStandbyReplicationStatus,
2627
};
2728
use crate::{
2829
commons::{BindingDestinationType, SupportedProtocol, UserLimitTarget, VirtualHostLimitTarget},
@@ -520,6 +521,14 @@ where
520521
Ok(response)
521522
}
522523

524+
/// Lists all queues and streams across the cluster. Compared to [`list_queues`], provides more queue metrics.
525+
/// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more.
526+
pub async fn list_queues_with_details(&self) -> Result<Vec<responses::DetailedQueueInfo>> {
527+
let response = self.http_get("queues/detailed", None, None).await?;
528+
let response = response.json().await?;
529+
Ok(response)
530+
}
531+
523532
/// Lists all exchanges across the cluster.
524533
/// See [Exchanges Guide](https://www.rabbitmq.com/docs/exchanges) to learn more.
525534
pub async fn list_exchanges(&self) -> Result<Vec<responses::ExchangeInfo>> {
@@ -1883,8 +1892,13 @@ where
18831892
}
18841893

18851894
/// Returns authentication attempt statistics for a given node.
1886-
pub async fn auth_attempts_statistics(&self, node: &str) -> Result<Vec<AuthenticationAttemptStatistics>> {
1887-
let response = self.http_get(path!("auth", "attempts", node), None, None).await?;
1895+
pub async fn auth_attempts_statistics(
1896+
&self,
1897+
node: &str,
1898+
) -> Result<Vec<AuthenticationAttemptStatistics>> {
1899+
let response = self
1900+
.http_get(path!("auth", "attempts", node), None, None)
1901+
.await?;
18881902
let response = response.json().await?;
18891903
Ok(response)
18901904
}

src/blocking_api.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use crate::requests::{
2020
FederationUpstreamParams, GlobalRuntimeParameterDefinition, SHOVEL_COMPONENT, StreamParams,
2121
};
2222
use crate::responses::{
23-
AuthenticationAttemptStatistics, ClusterTags, DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability,
24-
FeatureFlagState, FederationUpstream, GetMessage, OAuthConfiguration, VirtualHostDefinitionSet,
25-
WarmStandbyReplicationStatus,
23+
AuthenticationAttemptStatistics, ClusterTags, DeprecatedFeatureList, FeatureFlag,
24+
FeatureFlagList, FeatureFlagStability, FeatureFlagState, FederationUpstream, GetMessage,
25+
OAuthConfiguration, VirtualHostDefinitionSet, WarmStandbyReplicationStatus,
2626
};
2727
use crate::{
2828
commons::{BindingDestinationType, SupportedProtocol, UserLimitTarget, VirtualHostLimitTarget},
@@ -466,6 +466,12 @@ where
466466
self.get_api_request(path!("queues", virtual_host))
467467
}
468468

469+
/// Lists all queues and streams across the cluster. Compared to [`list_queues`], provides more queue metrics.
470+
/// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more.
471+
pub fn list_queues_with_details(&self) -> Result<Vec<responses::DetailedQueueInfo>> {
472+
self.get_api_request("queues/detailed")
473+
}
474+
469475
/// Lists all exchanges across the cluster.
470476
/// See [Exchanges Guide](https://www.rabbitmq.com/docs/exchanges) to learn more.
471477
pub fn list_exchanges(&self) -> Result<Vec<responses::ExchangeInfo>> {
@@ -1749,7 +1755,10 @@ where
17491755
}
17501756

17511757
/// Returns authentication attempt statistics for a given node.
1752-
pub fn auth_attempts_statistics(&self, node: &str) -> Result<Vec<AuthenticationAttemptStatistics>> {
1758+
pub fn auth_attempts_statistics(
1759+
&self,
1760+
node: &str,
1761+
) -> Result<Vec<AuthenticationAttemptStatistics>> {
17531762
let response = self.http_get(path!("auth", "attempts", node), None, None)?;
17541763
let response = response.json()?;
17551764
Ok(response)

src/responses.rs

Lines changed: 180 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use std::{fmt, ops};
1919

2020
use crate::commons::{
2121
BindingDestinationType, ChannelId, MessageTransferAcknowledgementMode, OverflowBehavior,
22-
PolicyTarget, QueueType, SupportedProtocol, Username, VirtualHostName, X_ARGUMENT_KEY_X_OVERFLOW,
23-
X_ARGUMENT_KEY_X_QUEUE_TYPE,
22+
PolicyTarget, QueueType, SupportedProtocol, Username, VirtualHostName,
23+
X_ARGUMENT_KEY_X_OVERFLOW, X_ARGUMENT_KEY_X_QUEUE_TYPE,
2424
};
2525
use crate::error::ConversionError;
2626
use crate::formatting::*;
@@ -979,6 +979,184 @@ pub struct QueueInfo {
979979
pub unacknowledged_message_count: u64,
980980
}
981981

982+
/// Represents detailed queue information with extended metrics and garbage collection details.
983+
/// This is an enhanced version of `QueueInfo` that includes additional fields from the detailed queues endpoint.
984+
#[derive(Debug, Deserialize, Clone)]
985+
#[cfg_attr(feature = "tabled", derive(Tabled))]
986+
#[allow(dead_code)]
987+
pub struct DetailedQueueInfo {
988+
pub name: String,
989+
pub vhost: VirtualHostName,
990+
#[serde(rename(deserialize = "type"))]
991+
pub queue_type: String,
992+
pub durable: bool,
993+
pub auto_delete: bool,
994+
pub exclusive: bool,
995+
#[cfg_attr(feature = "tabled", tabled(display = "display_arg_table"))]
996+
pub arguments: XArguments,
997+
998+
#[serde(default = "undefined")]
999+
pub node: String,
1000+
#[serde(default)]
1001+
pub state: String,
1002+
// only quorum queues and streams will have this
1003+
#[cfg_attr(feature = "tabled", tabled(display = "display_option"))]
1004+
pub leader: Option<String>,
1005+
#[cfg_attr(feature = "tabled", tabled(display = "display_option"))]
1006+
pub members: Option<NodeList>,
1007+
#[cfg_attr(feature = "tabled", tabled(display = "display_option"))]
1008+
pub online: Option<NodeList>,
1009+
1010+
#[serde(default)]
1011+
pub memory: u64,
1012+
#[serde(rename(deserialize = "consumers"))]
1013+
#[serde(default)]
1014+
pub consumer_count: u16,
1015+
#[serde(default)]
1016+
pub consumer_utilisation: f32,
1017+
#[cfg_attr(feature = "tabled", tabled(skip))]
1018+
pub exclusive_consumer_tag: Option<String>,
1019+
1020+
#[cfg_attr(feature = "tabled", tabled(display = "display_option"))]
1021+
pub policy: Option<String>,
1022+
1023+
#[serde(default)]
1024+
pub message_bytes: u64,
1025+
#[serde(default)]
1026+
#[cfg_attr(feature = "tabled", tabled(skip))]
1027+
pub message_bytes_persistent: u64,
1028+
#[serde(default)]
1029+
#[cfg_attr(feature = "tabled", tabled(skip))]
1030+
pub message_bytes_ram: u64,
1031+
#[serde(default)]
1032+
#[cfg_attr(feature = "tabled", tabled(skip))]
1033+
pub message_bytes_ready: u64,
1034+
#[serde(default)]
1035+
#[cfg_attr(feature = "tabled", tabled(skip))]
1036+
pub message_bytes_unacknowledged: u64,
1037+
1038+
#[serde(rename(deserialize = "messages"))]
1039+
#[serde(default)]
1040+
pub message_count: u64,
1041+
#[serde(rename(deserialize = "messages_persistent"))]
1042+
#[serde(default)]
1043+
#[cfg_attr(feature = "tabled", tabled(skip))]
1044+
pub on_disk_message_count: u64,
1045+
#[serde(rename(deserialize = "messages_ram"))]
1046+
#[serde(default)]
1047+
#[cfg_attr(feature = "tabled", tabled(skip))]
1048+
pub in_memory_message_count: u64,
1049+
#[serde(rename(deserialize = "messages_unacknowledged"))]
1050+
#[serde(default)]
1051+
pub unacknowledged_message_count: u64,
1052+
1053+
// Additional detailed fields
1054+
#[cfg_attr(feature = "tabled", tabled(skip))]
1055+
pub garbage_collection: Option<GarbageCollectionDetails>,
1056+
#[cfg_attr(feature = "tabled", tabled(skip))]
1057+
pub io_batch_size: Option<u32>,
1058+
#[cfg_attr(feature = "tabled", tabled(skip))]
1059+
pub io_batch_size_avg: Option<f64>,
1060+
#[cfg_attr(feature = "tabled", tabled(skip))]
1061+
pub io_batch_size_details: Option<Rate>,
1062+
#[cfg_attr(feature = "tabled", tabled(skip))]
1063+
pub io_file_handle_open_attempt_avg_time: Option<f64>,
1064+
#[cfg_attr(feature = "tabled", tabled(skip))]
1065+
pub io_file_handle_open_attempt_avg_time_details: Option<Rate>,
1066+
#[cfg_attr(feature = "tabled", tabled(skip))]
1067+
pub io_read_avg_time: Option<f64>,
1068+
#[cfg_attr(feature = "tabled", tabled(skip))]
1069+
pub io_read_avg_time_details: Option<Rate>,
1070+
#[cfg_attr(feature = "tabled", tabled(skip))]
1071+
pub io_read_bytes: Option<u64>,
1072+
#[cfg_attr(feature = "tabled", tabled(skip))]
1073+
pub io_read_bytes_details: Option<Rate>,
1074+
#[cfg_attr(feature = "tabled", tabled(skip))]
1075+
pub io_read_count: Option<u64>,
1076+
#[cfg_attr(feature = "tabled", tabled(skip))]
1077+
pub io_read_count_details: Option<Rate>,
1078+
#[cfg_attr(feature = "tabled", tabled(skip))]
1079+
pub io_reopen_count: Option<u64>,
1080+
#[cfg_attr(feature = "tabled", tabled(skip))]
1081+
pub io_reopen_count_details: Option<Rate>,
1082+
#[cfg_attr(feature = "tabled", tabled(skip))]
1083+
pub io_seek_avg_time: Option<f64>,
1084+
#[cfg_attr(feature = "tabled", tabled(skip))]
1085+
pub io_seek_avg_time_details: Option<Rate>,
1086+
#[cfg_attr(feature = "tabled", tabled(skip))]
1087+
pub io_seek_count: Option<u64>,
1088+
#[cfg_attr(feature = "tabled", tabled(skip))]
1089+
pub io_seek_count_details: Option<Rate>,
1090+
#[cfg_attr(feature = "tabled", tabled(skip))]
1091+
pub io_sync_avg_time: Option<f64>,
1092+
#[cfg_attr(feature = "tabled", tabled(skip))]
1093+
pub io_sync_avg_time_details: Option<Rate>,
1094+
#[cfg_attr(feature = "tabled", tabled(skip))]
1095+
pub io_sync_count: Option<u64>,
1096+
#[cfg_attr(feature = "tabled", tabled(skip))]
1097+
pub io_sync_count_details: Option<Rate>,
1098+
#[cfg_attr(feature = "tabled", tabled(skip))]
1099+
pub io_write_avg_time: Option<f64>,
1100+
#[cfg_attr(feature = "tabled", tabled(skip))]
1101+
pub io_write_avg_time_details: Option<Rate>,
1102+
#[cfg_attr(feature = "tabled", tabled(skip))]
1103+
pub io_write_bytes: Option<u64>,
1104+
#[cfg_attr(feature = "tabled", tabled(skip))]
1105+
pub io_write_bytes_details: Option<Rate>,
1106+
#[cfg_attr(feature = "tabled", tabled(skip))]
1107+
pub io_write_count: Option<u64>,
1108+
#[cfg_attr(feature = "tabled", tabled(skip))]
1109+
pub io_write_count_details: Option<Rate>,
1110+
}
1111+
1112+
/// Garbage collection details for queue processes
1113+
#[derive(Debug, Deserialize, Clone)]
1114+
#[cfg_attr(feature = "tabled", derive(Tabled))]
1115+
#[allow(dead_code)]
1116+
pub struct GarbageCollectionDetails {
1117+
pub fullsweep_after: u32,
1118+
pub max_heap_size: u32,
1119+
pub min_bin_vheap_size: u32,
1120+
pub min_heap_size: u32,
1121+
pub minor_gcs: u32,
1122+
}
1123+
1124+
impl QueueOps for DetailedQueueInfo {
1125+
fn name(&self) -> &str {
1126+
&self.name
1127+
}
1128+
1129+
fn queue_type(&self) -> QueueType {
1130+
QueueType::from(self.queue_type.as_str())
1131+
}
1132+
1133+
fn policy_target_type(&self) -> PolicyTarget {
1134+
PolicyTarget::from(self.queue_type())
1135+
}
1136+
1137+
fn x_arguments(&self) -> &XArguments {
1138+
&self.arguments
1139+
}
1140+
}
1141+
1142+
impl NamedPolicyTargetObject for DetailedQueueInfo {
1143+
fn vhost(&self) -> String {
1144+
self.vhost.clone()
1145+
}
1146+
1147+
fn name(&self) -> String {
1148+
self.name.clone()
1149+
}
1150+
1151+
fn policy_target(&self) -> PolicyTarget {
1152+
self.policy_target_type()
1153+
}
1154+
1155+
fn does_match(&self, policy: &Policy) -> bool {
1156+
policy.does_match_object(self)
1157+
}
1158+
}
1159+
9821160
impl QueueOps for QueueInfo {
9831161
fn name(&self) -> &str {
9841162
&self.name

tests/async_auth_attempts_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub async fn test_async_auth_attempts_statistics() {
3939
for proto in stats {
4040
assert!(proto.failure_count + proto.success_count <= proto.all_attempt_count);
4141
}
42-
},
42+
}
4343
Err(_) => {
4444
// Assume the endpoint wasn't available in this RabbitMQ version
4545
}

tests/async_queue_tests.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,49 @@ async fn test_async_list_queues_in_a_virtual_host() {
146146

147147
rc.delete_queue(vh_name, params.name, false).await.unwrap();
148148
}
149+
150+
#[tokio::test]
151+
async fn test_async_list_queues_with_details() {
152+
let endpoint = endpoint();
153+
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
154+
155+
let vh_name = "/";
156+
157+
let params = QueueParams::new_durable_classic_queue("rust.tests.cq.detailed.92734827364", None);
158+
let result1 = rc.declare_queue(vh_name, &params).await;
159+
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
160+
161+
test_helpers::async_await_queue_metric_emission().await;
162+
163+
let result2 = rc.list_queues_with_details().await;
164+
assert!(
165+
result2.is_ok(),
166+
"list_queues_with_details returned {result2:?}"
167+
);
168+
169+
let detailed_queues = result2.unwrap();
170+
assert!(
171+
!detailed_queues.is_empty(),
172+
"Expected at least one queue in detailed list"
173+
);
174+
175+
// Find our test queue in the results
176+
let test_queue = detailed_queues.iter().find(|q| q.name == params.name);
177+
assert!(
178+
test_queue.is_some(),
179+
"Expected to find our test queue in detailed results"
180+
);
181+
182+
let queue = test_queue.unwrap();
183+
// Verify basic queue properties are present
184+
assert_eq!(queue.name, params.name);
185+
assert_eq!(queue.vhost, vh_name);
186+
assert_eq!(queue.durable, true);
187+
188+
// More fields
189+
if let Some(gc) = &queue.garbage_collection {
190+
assert!(gc.fullsweep_after > 1000);
191+
}
192+
193+
rc.delete_queue(vh_name, params.name, false).await.unwrap();
194+
}

tests/blocking_auth_attempts_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub fn test_blocking_auth_attempts_statistics() {
3939
for proto in stats {
4040
assert!(proto.failure_count + proto.success_count <= proto.all_attempt_count);
4141
}
42-
},
42+
}
4343
Err(_) => {
4444
// Assume the endpoint wasn't available in this RabbitMQ version
4545
}

tests/blocking_queue_tests.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,50 @@ fn test_blocking_list_queues_in_a_virtual_host() {
146146

147147
rc.delete_queue(vh_name, params.name, false).unwrap();
148148
}
149+
150+
#[test]
151+
pub fn test_blocking_list_queues_with_details() {
152+
let endpoint = endpoint();
153+
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
154+
155+
let vh_name = "/";
156+
157+
let params =
158+
QueueParams::new_durable_classic_queue("rust.tests.cq.detailed.blocking.18273486", None);
159+
let result1 = rc.declare_queue(vh_name, &params);
160+
assert!(result1.is_ok(), "declare_queue returned {result1:?}");
161+
162+
test_helpers::await_queue_metric_emission();
163+
164+
let result2 = rc.list_queues_with_details();
165+
assert!(
166+
result2.is_ok(),
167+
"list_queues_with_details returned {result2:?}"
168+
);
169+
170+
let detailed_queues = result2.unwrap();
171+
assert!(
172+
!detailed_queues.is_empty(),
173+
"Expected at least one queue in detailed list"
174+
);
175+
176+
// Find our test queue in the results
177+
let test_queue = detailed_queues.iter().find(|q| q.name == params.name);
178+
assert!(
179+
test_queue.is_some(),
180+
"Expected to find our test queue in detailed results"
181+
);
182+
183+
let queue = test_queue.unwrap();
184+
// Verify basic queue properties are present
185+
assert_eq!(queue.name, params.name);
186+
assert_eq!(queue.vhost, vh_name);
187+
assert_eq!(queue.durable, true);
188+
189+
// More fields
190+
if let Some(gc) = &queue.garbage_collection {
191+
assert!(gc.fullsweep_after > 1000);
192+
}
193+
194+
rc.delete_queue(vh_name, params.name, false).unwrap();
195+
}

0 commit comments

Comments
 (0)