Skip to content

Commit 06acdcb

Browse files
committed
feat(backends): Add throttle option to opendal backend
1 parent 32a5737 commit 06acdcb

File tree

2 files changed

+40
-4
lines changed

2 files changed

+40
-4
lines changed

crates/backend/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,17 @@ rand = { version = "0.8.5", optional = true }
8282
semver = { version = "1.0.22", optional = true }
8383

8484
# opendal backend
85+
bytesize = "1.3.0"
8586
rayon = { version = "1.9.0", optional = true }
8687
tokio = { version = "1.36.0", optional = true, default-features = false }
8788

8889
[target.'cfg(not(windows))'.dependencies]
8990
# opendal backend - sftp is not supported on windows, see https://github.com/apache/incubator-opendal/issues/2963
90-
opendal = { version = "0.45", features = ["services-b2", "services-sftp", "services-swift", "layers-blocking"], optional = true }
91+
opendal = { version = "0.45", features = ["services-b2", "services-sftp", "services-swift", "layers-blocking", "layers-throttle"], optional = true }
9192

9293
[target.'cfg(windows)'.dependencies]
9394
# opendal backend
94-
opendal = { version = "0.45", features = ["services-b2", "services-swift", "layers-blocking"], optional = true }
95+
opendal = { version = "0.45", features = ["services-b2", "services-swift", "layers-blocking", "layers-throttle"], optional = true }
9596

9697
[dev-dependencies]
9798
rstest = { workspace = true }

crates/backend/src/opendal.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/// `OpenDAL` backend for rustic.
22
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::OnceLock};
33

4-
use anyhow::Result;
4+
use anyhow::{anyhow, Error, Result};
55
use bytes::Bytes;
6+
use bytesize::ByteSize;
67
use log::trace;
78
use opendal::{
8-
layers::{BlockingLayer, ConcurrentLimitLayer, LoggingLayer, RetryLayer},
9+
layers::{BlockingLayer, ConcurrentLimitLayer, LoggingLayer, RetryLayer, ThrottleLayer},
910
BlockingOperator, ErrorKind, Metakey, Operator, Scheme,
1011
};
1112
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
@@ -34,6 +35,31 @@ fn runtime() -> &'static Runtime {
3435
})
3536
}
3637

38+
/// Throttling parameters
39+
///
40+
/// Note: Throttle implements FromStr to read it from something like "10kiB,10MB"
41+
#[derive(Debug, Clone, Copy)]
42+
pub struct Throttle {
43+
bandwidth: u32,
44+
burst: u32,
45+
}
46+
47+
impl FromStr for Throttle {
48+
type Err = Error;
49+
fn from_str(s: &str) -> Result<Self> {
50+
let mut values = s
51+
.split(',')
52+
.map(|s| ByteSize::from_str(s).map_err(|err| anyhow!("Error: {err}")))
53+
.map(|b| -> Result<u32> { Ok(b?.as_u64().try_into()?) });
54+
let bandwidth = values
55+
.next()
56+
.ok_or_else(|| anyhow!("no bandwidth given"))??;
57+
let burst = values.next().ok_or_else(|| anyhow!("no burst given"))??;
58+
let throttle = Throttle { bandwidth, burst };
59+
Ok(throttle)
60+
}
61+
}
62+
3763
impl OpenDALBackend {
3864
/// Create a new openDAL backend.
3965
///
@@ -60,10 +86,19 @@ impl OpenDALBackend {
6086
.map(|c| usize::from_str(c))
6187
.transpose()?;
6288

89+
let throttle = options
90+
.get("throttle")
91+
.map(|t| Throttle::from_str(t))
92+
.transpose()?;
93+
6394
let schema = Scheme::from_str(path.as_ref())?;
6495
let mut operator = Operator::via_map(schema, options)?
6596
.layer(RetryLayer::new().with_max_times(max_retries).with_jitter());
6697

98+
if let Some(Throttle { bandwidth, burst }) = throttle {
99+
operator = operator.layer(ThrottleLayer::new(bandwidth, burst));
100+
}
101+
67102
if let Some(connections) = connections {
68103
operator = operator.layer(ConcurrentLimitLayer::new(connections));
69104
}

0 commit comments

Comments
 (0)