|
4 | 4 | use super::error::{Fe2o3ConnectionError, Fe2o3ConnectionOpenError, Fe2o3TransportError};
|
5 | 5 | use crate::connection::{AmqpConnectionApis, AmqpConnectionOptions};
|
6 | 6 | use crate::error::AmqpErrorKind;
|
| 7 | +#[cfg(feature = "socks5")] |
7 | 8 | use crate::socks5::SocksConnection;
|
8 | 9 | use crate::value::{AmqpOrderedMap, AmqpSymbol, AmqpValue};
|
9 | 10 | use azure_core::{http::Url, Result};
|
@@ -45,6 +46,7 @@ impl Drop for Fe2o3AmqpConnection {
|
45 | 46 | }
|
46 | 47 | }
|
47 | 48 |
|
| 49 | + |
48 | 50 | #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
|
49 | 51 | #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
|
50 | 52 | impl AmqpConnectionApis for Fe2o3AmqpConnection {
|
@@ -123,58 +125,59 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
|
123 | 125 | } else {
|
124 | 126 | "direct"
|
125 | 127 | };
|
126 |
| - let connection = if endpoint.scheme() == "socks5" || endpoint.scheme() == "socks5h" { |
127 |
| - debug!( |
128 |
| - connection_id = %id, |
129 |
| - proxy_scheme = %endpoint.scheme(), |
130 |
| - target_host = %url.host_str().unwrap_or("unknown"), |
131 |
| - "Opening AMQP connection through SOCKS5 proxy" |
132 |
| - ); |
133 |
| - |
134 |
| - // Use fe2o3's open_with_stream() to inject SOCKS5 connection |
135 |
| - let stream = SocksConnection::connect(&endpoint, &url) |
136 |
| - .await |
137 |
| - .map_err(|e| { |
138 |
| - error!( |
| 128 | + let connection = { |
| 129 | + #[cfg(feature = "socks5")] |
| 130 | + { |
| 131 | + if endpoint.scheme() == "socks5" || endpoint.scheme() == "socks5h" { |
| 132 | + debug!( |
139 | 133 | connection_id = %id,
|
140 |
| - proxy_url = %SocksConnection::mask_credentials(&endpoint), |
141 |
| - error = %e, |
142 |
| - "Failed to establish SOCKS5 connection" |
| 134 | + proxy_scheme = %endpoint.scheme(), |
| 135 | + target_host = %url.host_str().unwrap_or("unknown"), |
| 136 | + "Opening AMQP connection through SOCKS5 proxy" |
143 | 137 | );
|
144 |
| - e |
145 |
| - })?; |
146 | 138 |
|
147 |
| - debug!( |
148 |
| - connection_id = %id, |
149 |
| - "Opening AMQP connection with SOCKS5 stream" |
150 |
| - ); |
| 139 | + let stream = |
| 140 | + SocksConnection::connect(&endpoint, &url) |
| 141 | + .await |
| 142 | + .map_err(|e| { |
| 143 | + error!( |
| 144 | + connection_id = %id, |
| 145 | + proxy_url = %SocksConnection::mask_credentials(&endpoint), |
| 146 | + error = %e, |
| 147 | + "Failed to establish SOCKS5 connection" |
| 148 | + ); |
| 149 | + e |
| 150 | + })?; |
151 | 151 |
|
152 |
| - builder.open_with_stream(stream).await.map_err(|e| { |
153 |
| - error!( |
154 |
| - connection_id = %id, |
155 |
| - error = %e, |
156 |
| - "Failed to open AMQP connection over SOCKS5 stream" |
157 |
| - ); |
158 |
| - azure_core::Error::from(Fe2o3ConnectionOpenError(e)) |
159 |
| - })? |
160 |
| - } else { |
161 |
| - debug!( |
162 |
| - connection_id = %id, |
163 |
| - endpoint = %endpoint, |
164 |
| - "Opening direct AMQP connection" |
165 |
| - ); |
166 |
| - |
167 |
| - // Maintain existing behavior for non-SOCKS5 URLs |
168 |
| - let endpoint_str = endpoint.to_string(); |
169 |
| - builder.open(endpoint).await.map_err(|e| { |
170 |
| - error!( |
171 |
| - connection_id = %id, |
172 |
| - endpoint = %endpoint_str, |
173 |
| - error = %e, |
174 |
| - "Failed to open direct AMQP connection" |
175 |
| - ); |
176 |
| - azure_core::Error::from(Fe2o3ConnectionOpenError(e)) |
177 |
| - })? |
| 152 | + debug!(connection_id = %id, "Opening AMQP connection with SOCKS5 stream"); |
| 153 | + builder.open_with_stream(stream).await.map_err(|e| { |
| 154 | + error!(connection_id = %id, error = %e, "Failed to open AMQP connection over SOCKS5 stream"); |
| 155 | + azure_core::Error::from(Fe2o3ConnectionOpenError(e)) |
| 156 | + })? |
| 157 | + } else { |
| 158 | + debug!(connection_id = %id, endpoint = %endpoint, "Opening direct AMQP connection"); |
| 159 | + let endpoint_str = endpoint.to_string(); |
| 160 | + builder.open(endpoint).await.map_err(|e| { |
| 161 | + error!(connection_id = %id, endpoint = %endpoint_str, error = %e, "Failed to open direct AMQP connection"); |
| 162 | + azure_core::Error::from(Fe2o3ConnectionOpenError(e)) |
| 163 | + })? |
| 164 | + } |
| 165 | + } |
| 166 | + #[cfg(not(feature = "socks5"))] |
| 167 | + { |
| 168 | + if endpoint.scheme() == "socks5" || endpoint.scheme() == "socks5h" { |
| 169 | + return Err(azure_core::Error::with_message( |
| 170 | + azure_core::error::ErrorKind::Amqp, |
| 171 | + "SOCKS5 proxy support is not enabled. Enable the 'socks5' feature to use SOCKS5 proxies." |
| 172 | + )); |
| 173 | + } |
| 174 | + debug!(connection_id = %id, endpoint = %endpoint, "Opening direct AMQP connection"); |
| 175 | + let endpoint_str = endpoint.to_string(); |
| 176 | + builder.open(endpoint).await.map_err(|e| { |
| 177 | + error!(connection_id = %id, endpoint = %endpoint_str, error = %e, "Failed to open direct AMQP connection"); |
| 178 | + azure_core::Error::from(Fe2o3ConnectionOpenError(e)) |
| 179 | + })? |
| 180 | + } |
178 | 181 | };
|
179 | 182 |
|
180 | 183 | debug!(
|
|
0 commit comments