From d94c005a01731db2860063c45f017f75db420807 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Fri, 24 Jul 2020 19:43:46 -0500 Subject: [PATCH 1/2] Switch to one single runtime for S3 storage Changed S3Backend::start_storage_transaction from creating a new Runtime on each call to using a lazily-initialized global runtime for all S3 instances to use. Since Tokio v0.1's Runtime has no mechanism to allow for blocking that doesn't require an &mut self, the runtime is stored inside of a mutex to allow for mutable access. We're already transitively dependent on parking_lot v0.10.2, so this is just adding it as a direct dependency (Removing tokio v0.2 will remove the other) version of parking_lot we depend on). When upgrading to tokio v0.2 this should be changed since v0.2 has the Runtime::handle method which allows runtime access from only &self. --- Cargo.lock | 1 + Cargo.toml | 1 + src/storage/s3.rs | 14 ++++++++------ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3949548f..eff2b7bae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -373,6 +373,7 @@ dependencies = [ "notify 4.0.15 (registry+https://github.com/rust-lang/crates.io-index)", "once_cell 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "params 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "path-slash 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "postgres 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)", "procfs 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 63b5df519..d9a0341b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ path-slash = "0.1.3" once_cell = { version = "1.4.0", features = ["parking_lot"] } base64 = "0.12.1" strum = { version = "0.18.0", features = ["derive"] } +parking_lot = "0.10.2" # Data serialization and deserialization serde = { version = "1.0", features = ["derive"] } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index bfd6d7b3a..3007770c3 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -4,6 +4,8 @@ use failure::Error; use futures::stream::{FuturesUnordered, Stream}; use futures::Future; use log::{error, warn}; +use once_cell::sync::Lazy; +use parking_lot::Mutex; use rusoto_core::region::Region; use rusoto_credential::DefaultCredentialsProvider; use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; @@ -16,10 +18,13 @@ mod test; pub(crate) use test::TestS3; pub(crate) static S3_BUCKET_NAME: &str = "rust-docs-rs"; +static S3_RUNTIME: Lazy> = + Lazy::new(|| Mutex::new(Runtime::new().expect("Failed to create S3 runtime"))); pub(crate) struct S3Backend { client: S3Client, bucket: String, + runtime: &'static Mutex, } impl S3Backend { @@ -27,6 +32,7 @@ impl S3Backend { Self { client, bucket: bucket.into(), + runtime: &*S3_RUNTIME, } } @@ -63,16 +69,12 @@ impl S3Backend { } pub(super) fn start_storage_transaction(&self) -> Result { - Ok(S3StorageTransaction { - s3: self, - runtime: Runtime::new()?, - }) + Ok(S3StorageTransaction { s3: self }) } } pub(super) struct S3StorageTransaction<'a> { s3: &'a S3Backend, - runtime: Runtime, } impl<'a> StorageTransaction for S3StorageTransaction<'a> { @@ -100,7 +102,7 @@ impl<'a> StorageTransaction for S3StorageTransaction<'a> { } attempts += 1; - match self.runtime.block_on(futures.map(drop).collect()) { + match self.s3.runtime.lock().block_on(futures.map(drop).collect()) { // this batch was successful, start another batch if there are still more files Ok(_) => break, Err(err) => { From 9c8d5129e11723c18ff53728e6bc73efa5883e23 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Fri, 24 Jul 2020 20:59:58 -0500 Subject: [PATCH 2/2] Made the S3 backend use S3_RUNTIME directly --- src/storage/s3.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 3007770c3..4d09a170e 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -24,7 +24,6 @@ static S3_RUNTIME: Lazy> = pub(crate) struct S3Backend { client: S3Client, bucket: String, - runtime: &'static Mutex, } impl S3Backend { @@ -32,7 +31,6 @@ impl S3Backend { Self { client, bucket: bucket.into(), - runtime: &*S3_RUNTIME, } } @@ -102,7 +100,7 @@ impl<'a> StorageTransaction for S3StorageTransaction<'a> { } attempts += 1; - match self.s3.runtime.lock().block_on(futures.map(drop).collect()) { + match S3_RUNTIME.lock().block_on(futures.map(drop).collect()) { // this batch was successful, start another batch if there are still more files Ok(_) => break, Err(err) => { @@ -114,6 +112,7 @@ impl<'a> StorageTransaction for S3StorageTransaction<'a> { } } } + Ok(()) }