Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ getrandom = { version = "0.3" }
gloo-timers = { version = "0.3" }
hmac = { version = "0.12" }
litemap = "0.7.4"
native-tls = "0.2"
openssl = { version = "0.10.72" }
opentelemetry = { version = "0.30", features = ["trace"] }
opentelemetry_sdk = "0.30"
Expand Down Expand Up @@ -143,6 +144,8 @@ tokio = { version = "1.0", default-features = false, features = [
"macros",
"time",
] }
tokio-native-tls = "0.3"
tokio-socks = "0.5"
tracing = "0.1.40"
tracing-subscriber = "0.3"
url = "2.2"
Expand Down
3 changes: 3 additions & 0 deletions sdk/core/azure_core_amqp/.dict.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ amqps
sastoken
smallulong
smalluint
proxyuser
testuser
testpass
7 changes: 6 additions & 1 deletion sdk/core/azure_core_amqp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ serde.workspace = true
serde_amqp = { workspace = true, optional = true }
serde_bytes = { workspace = true, optional = true }
tokio.workspace = true
tokio-native-tls = { workspace = true, optional = true }
tokio-socks = { workspace = true, optional = true }
native-tls = { workspace = true, optional = true }
tracing.workspace = true
typespec = { workspace = true, features = ["amqp"] }
typespec_client_core = { workspace = true }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything in here is in azure_core. azure_* crates outside of core should not reference typespec crates.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heaths I referenced it in sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml in order to use the sanitize. any suggestion about that? remove sanitize? leave it in the file? what is the best course of action here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm saying that the sanitization function should be exported in azure_core as well, in the same module (hopefully, unless we missed something). For any azure_* crates we're trying to "hide" the typespec crates as an implementation detail. Everything they export is imported and re-exported by azure_core in the same module path. A couple types we "override", but internally make convertible to the types they "override" in typespec.

typespec_macros.workspace = true

[dev-dependencies]
Expand All @@ -49,9 +53,10 @@ fe2o3_amqp = [
"serde_amqp",
"serde_bytes",
]
socks5 = ["dep:tokio-socks", "dep:native-tls", "dep:tokio-native-tls"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to add this to the docs.rs metadata table below or it won't be documented. We intentionally don't document the others as they are meant more for internal testing / development.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


[lints]
workspace = true

[package.metadata.docs.rs]
features = ["fe2o3_amqp"]
features = ["fe2o3_amqp", "socks5"]
43 changes: 43 additions & 0 deletions sdk/core/azure_core_amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,49 @@ Azure AMQP crate for consumption of AMQP based packages in the Azure SDK for Rus

This crate is part of a collection of crates: for more information please refer to [https://github.com/azure/azure-sdk-for-rust](https://github.com/azure/azure-sdk-for-rust).

## SOCKS5 Proxy Support

This crate supports SOCKS5 proxy connections for corporate environments.

**Note**: SOCKS5 support requires enabling the `socks5` feature:

```toml
[dependencies]
azure_core_amqp = { version = "0.8", features = ["socks5"] }
```

SOCKS5 support is enabled by configuring the `custom_endpoint` option with a SOCKS5 URL:

```rust,no_run
use azure_core_amqp::AmqpConnectionOptions;

# fn main() -> Result<(), Box<dyn std::error::Error>> {
let options = AmqpConnectionOptions {
custom_endpoint: Some("socks5h://proxy.contoso.com:8080".parse()?),
..Default::default()
};
# Ok(())
# }
```

### Supported Protocols

- **socks5://** - Standard SOCKS5 with local DNS resolution
- **socks5h://** - SOCKS5 with proxy-side DNS resolution (recommended for corporate environments)

### Authentication

Username/password authentication is supported via the proxy URL:
```text
socks5://username:[email protected]:1080
```

All proxy credentials are automatically masked in log output for security.

### Dependencies

SOCKS5 support adds the `tokio-socks` dependency to the crate.

## Testing the AMQP Client

The AMQP package is tested using the standard `cargo test` command line:
Expand Down
222 changes: 162 additions & 60 deletions sdk/core/azure_core_amqp/src/fe2o3/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
use super::error::{Fe2o3ConnectionError, Fe2o3ConnectionOpenError, Fe2o3TransportError};
use crate::connection::{AmqpConnectionApis, AmqpConnectionOptions};
use crate::error::AmqpErrorKind;
#[cfg(feature = "socks5")]
use crate::socks5::SocksConnection;
use crate::value::{AmqpOrderedMap, AmqpSymbol, AmqpValue};
use azure_core::{http::Url, Result};
use fe2o3_amqp::connection::ConnectionHandle;
use std::{borrow::BorrowMut, sync::OnceLock};
use tokio::sync::Mutex;
use tracing::{debug, warn};
use tracing::{debug, error, warn};

#[derive(Debug, Default)]
pub(crate) struct Fe2o3AmqpConnection {
Expand Down Expand Up @@ -44,6 +46,117 @@ impl Drop for Fe2o3AmqpConnection {
}
}

macro_rules! configure_builder {
($id:expr, $url:expr, $options:expr) => {{
let mut builder = fe2o3_amqp::Connection::builder()
.sasl_profile(fe2o3_amqp::sasl_profile::SaslProfile::Anonymous)
.alt_tls_establishment(true)
.container_id($id)
.max_frame_size(65536);

if let Some(frame_size) = $options.max_frame_size {
builder = builder.max_frame_size(frame_size);
}

if let Some(channel_max) = $options.channel_max {
builder = builder.channel_max(channel_max);
}

if let Some(idle_timeout) = $options.idle_timeout {
builder = builder.idle_time_out(idle_timeout.whole_milliseconds() as u32);
}

if let Some(outgoing_locales) = &$options.outgoing_locales {
builder = builder.set_outgoing_locales(
outgoing_locales
.iter()
.map(|s| fe2o3_amqp_types::primitives::Symbol::from(s.as_str()))
.collect(),
);
}

if let Some(incoming_locales) = &$options.incoming_locales {
builder = builder.set_incoming_locales(
incoming_locales
.iter()
.map(|s| fe2o3_amqp_types::primitives::Symbol::from(s.as_str()))
.collect(),
);
}

if let Some(offered_capabilities) = &$options.offered_capabilities {
builder = builder
.set_offered_capabilities(offered_capabilities.iter().map(Into::into).collect());
}

if let Some(desired_capabilities) = &$options.desired_capabilities {
builder = builder
.set_desired_capabilities(desired_capabilities.iter().map(Into::into).collect());
}

if let Some(properties) = &$options.properties {
builder = builder.properties(
properties
.iter()
.map(|(k, v)| (k.into(), v.into()))
.collect(),
);
}

if let Some(buffer_size) = $options.buffer_size {
builder = builder.buffer_size(buffer_size);
}

// Set hostname if using custom endpoint
if $options.custom_endpoint.is_some() {
builder = builder.hostname($url.host_str());
}

builder
}};
}

#[cfg(feature = "socks5")]
async fn prepare_socks5_connection(
id: &str,
url: &Url,
endpoint: &Url,
) -> Result<Option<Box<dyn crate::socks5::SocksStream>>> {
if endpoint.scheme() == "socks5" || endpoint.scheme() == "socks5h" {
debug!(
connection_id = %id,
proxy_scheme = %endpoint.scheme(),
target_host = %url.host_str().unwrap_or("unknown"),
"Opening AMQP connection through SOCKS5 proxy"
);

let stream = SocksConnection::connect(endpoint, url).await.map_err(|e| {
error!(
connection_id = %id,
proxy_url = %SocksConnection::mask_credentials(endpoint),
error = %e,
"Failed to establish SOCKS5 connection"
);
e
})?;

Ok(Some(stream))
} else {
Ok(None)
}
}

#[cfg(not(feature = "socks5"))]
async fn validate_no_socks5(endpoint: &Url) -> Result<()> {
if endpoint.scheme() == "socks5" || endpoint.scheme() == "socks5h" {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::Amqp,
"SOCKS5 proxy support is not enabled. Enable the 'socks5' feature to use SOCKS5 proxies."
));
}
Ok(())
}

#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl AmqpConnectionApis for Fe2o3AmqpConnection {
Expand All @@ -57,70 +170,59 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
let options = options.unwrap_or_default();
let mut endpoint = url.clone();

// All AMQP clients have a similar set of options.
let mut builder = fe2o3_amqp::Connection::builder()
.sasl_profile(fe2o3_amqp::sasl_profile::SaslProfile::Anonymous)
.alt_tls_establishment(true)
.container_id(id)
.max_frame_size(65536);

if let Some(frame_size) = options.max_frame_size {
builder = builder.max_frame_size(frame_size);
if let Some(custom_endpoint) = options.custom_endpoint.clone() {
endpoint = custom_endpoint;
}

if let Some(channel_max) = options.channel_max {
builder = builder.channel_max(channel_max);
}
if let Some(idle_timeout) = options.idle_timeout {
builder = builder.idle_time_out(idle_timeout.whole_milliseconds() as u32);
}
if let Some(outgoing_locales) = options.outgoing_locales {
builder = builder.set_outgoing_locales(
outgoing_locales
.into_iter()
.map(fe2o3_amqp_types::primitives::Symbol::from)
.collect(),
);
}
if let Some(incoming_locales) = options.incoming_locales {
builder = builder.set_incoming_locales(
incoming_locales
.into_iter()
.map(fe2o3_amqp_types::primitives::Symbol::from)
.collect(),
);
}
if let Some(offered_capabilities) = options.offered_capabilities {
builder = builder.set_offered_capabilities(
offered_capabilities.into_iter().map(Into::into).collect(),
);
}
if let Some(desired_capabilities) = options.desired_capabilities {
builder = builder.set_desired_capabilities(
desired_capabilities.into_iter().map(Into::into).collect(),
);
}
if let Some(properties) = options.properties {
builder = builder.properties(
properties
.iter()
.map(|(k, v)| (k.into(), v.into()))
.collect(),
);
}
if let Some(buffer_size) = options.buffer_size {
builder = builder.buffer_size(buffer_size);
}
let connection_type = if endpoint.scheme().starts_with("socks5") {
"socks5"
} else {
"direct"
};

if let Some(custom_endpoint) = options.custom_endpoint {
endpoint = custom_endpoint;
builder = builder.hostname(url.host_str());
}
let connection = {
#[cfg(feature = "socks5")]
{
let stream = prepare_socks5_connection(&id, &url, &endpoint).await?;
let builder = configure_builder!(&id, &url, &options);

if let Some(stream) = stream {
debug!(connection_id = %id, "Opening AMQP connection with SOCKS5 stream");
builder.open_with_stream(stream).await.map_err(|e| {
error!(connection_id = %id, error = %e, "Failed to open AMQP connection over SOCKS5 stream");
azure_core::Error::from(Fe2o3ConnectionOpenError(e))
})?
} else {
debug!(connection_id = %id, endpoint = %endpoint, "Opening direct AMQP connection");
let endpoint_str = endpoint.to_string();
builder.open(endpoint).await.map_err(|e| {
error!(connection_id = %id, endpoint = %endpoint_str, error = %e, "Failed to open direct AMQP connection");
azure_core::Error::from(Fe2o3ConnectionOpenError(e))
})?
}
}
#[cfg(not(feature = "socks5"))]
{
validate_no_socks5(&endpoint).await?;
let builder = configure_builder!(&id, &url, &options);

debug!(connection_id = %id, endpoint = %endpoint, "Opening direct AMQP connection");
let endpoint_str = endpoint.to_string();
builder.open(endpoint).await.map_err(|e| {
error!(connection_id = %id, endpoint = %endpoint_str, error = %e, "Failed to open direct AMQP connection");
azure_core::Error::from(Fe2o3ConnectionOpenError(e))
})?
}
};

debug!(
connection_id = %id,
connection_type = %connection_type,
"AMQP connection opened successfully"
);

self.connection
.set(Mutex::new(builder.open(endpoint).await.map_err(|e| {
azure_core::Error::from(Fe2o3ConnectionOpenError(e))
})?))
.set(Mutex::new(connection))
.map_err(|_| Self::connection_already_set())?;
Ok(())
}
Expand Down
Loading