diff --git a/crates/backend/Cargo.toml b/crates/backend/Cargo.toml index 28affd0c..fb2f6a47 100644 --- a/crates/backend/Cargo.toml +++ b/crates/backend/Cargo.toml @@ -89,11 +89,11 @@ tokio = { version = "1.35.1", optional = true, default-features = false } [target.'cfg(not(windows))'.dependencies] # opendal backend - sftp is not supported on windows, see https://github.com/apache/incubator-opendal/issues/2963 -opendal = { version = "0.44.2", features = ["services-b2", "services-sftp", "services-swift"], optional = true } +opendal = { version = "0.45", features = ["services-b2", "services-sftp", "services-swift", "layers-blocking"], optional = true } [target.'cfg(windows)'.dependencies] # opendal backend -opendal = { version = "0.44.2", features = ["services-b2", "services-swift"], optional = true } +opendal = { version = "0.45", features = ["services-b2", "services-swift", "layers-blocking"], optional = true } [dev-dependencies] rstest = "0.18.2" diff --git a/crates/backend/src/opendal.rs b/crates/backend/src/opendal.rs index 9d21e5b7..70944fbe 100644 --- a/crates/backend/src/opendal.rs +++ b/crates/backend/src/opendal.rs @@ -9,7 +9,7 @@ use anyhow::Result; use bytes::Bytes; use log::trace; use opendal::{ - layers::{BlockingLayer, LoggingLayer, RetryLayer}, + layers::{BlockingLayer, ConcurrentLimitLayer, LoggingLayer, RetryLayer}, BlockingOperator, ErrorKind, Metakey, Operator, Scheme, }; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; @@ -45,19 +45,30 @@ impl OpenDALBackend { /// * `path` - The path to the OpenDAL backend. /// * `options` - Additional options for the OpenDAL backend. pub fn new(path: impl AsRef, options: HashMap) -> Result { - let max_retries = match options.get("retry").map(std::string::String::as_str) { + let max_retries = match options.get("retry").map(String::as_str) { Some("false" | "off") => 0, None | Some("default") => consts::DEFAULT_RETRY, Some(value) => usize::from_str(value)?, }; + let connections = options + .get("connections") + .map(|c| usize::from_str(c)) + .transpose()?; let schema = Scheme::from_str(path.as_ref())?; + let mut operator = Operator::via_map(schema, options)? + .layer(RetryLayer::new().with_max_times(max_retries).with_jitter()); + + if let Some(connections) = connections { + operator = operator.layer(ConcurrentLimitLayer::new(connections)); + } + let _guard = runtime().enter(); - let operator = Operator::via_map(schema, options)? - .layer(RetryLayer::new().with_max_times(max_retries).with_jitter()) + let operator = operator .layer(LoggingLayer::default()) .layer(BlockingLayer::create()?) .blocking(); + Ok(Self { operator }) }