Skip to content

Commit e024ce1

Browse files
committed
Unpack docker layers in parallel with downloading
1 parent 9bd9556 commit e024ce1

File tree

1 file changed

+99
-44
lines changed

1 file changed

+99
-44
lines changed

src/builder/commands/docker.rs

Lines changed: 99 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const DEFAULT_IMAGE_NAMESPACE: &str = "library";
3939
const DEFAULT_IMAGE_TAG: &str = "latest";
4040

4141
const DOCKER_LAYERS_CACHE_PATH: &str = "/vagga/cache/docker-layers";
42-
const DOCKER_LAYERS_DOWNLOAD_CONCURRENCY: usize = 4;
42+
const DOCKER_LAYERS_DOWNLOAD_CONCURRENCY: usize = 2;
4343

4444
#[derive(Serialize, Deserialize, Debug)]
4545
pub struct DockerImage {
@@ -125,29 +125,26 @@ impl BuildStep for DockerImage {
125125

126126
#[cfg(feature="containers")]
127127
fn build(&self, guard: &mut Guard, _build: bool) -> Result<(), StepError> {
128-
let insecure = self.insecure.unwrap_or_else(|| {
128+
let insecure = self.insecure.unwrap_or_else(||
129129
is_insecure_registry(&self.registry, &guard.ctx.settings.docker_insecure_registries)
130-
});
130+
);
131131
if !insecure {
132132
capsule::ensure(&mut guard.ctx.capsule, &[capsule::Https])?;
133133
}
134134
Dir::new(DOCKER_LAYERS_CACHE_PATH)
135135
.recursive(true)
136136
.create()
137-
.expect("Docker layers cache dir");
138-
let layer_paths = tokio::runtime::Builder::new_current_thread()
137+
.map_err(|e|
138+
format!("Cannot create docker layers cache directory: {}", e)
139+
)?;
140+
let dst_path = Path::new("/vagga/root").join(&self.path.strip_prefix("/").unwrap());
141+
tokio::runtime::Builder::new_current_thread()
139142
.enable_all()
140143
.build()
141-
.expect("Tokio runtime")
142-
.block_on(download_image(&self.registry, insecure, &self.image, &self.tag))
143-
.expect("Downloaded layers");
144-
let dst_path = Path::new("/vagga/root").join(&self.path.strip_prefix("/").unwrap());
145-
for layer_path in layer_paths.iter() {
146-
TarCmd::new(layer_path, &dst_path)
147-
.preserve_owner(true)
148-
.entry_handler(whiteout_entry_handler)
149-
.unpack()?;
150-
}
144+
.map_err(|e| format!("Error creating tokio runtime: {}", e))?
145+
.block_on(download_and_unpack_image(
146+
&self.registry, insecure, &self.image, &self.tag, &dst_path
147+
))?;
151148
Ok(())
152149
}
153150

@@ -156,6 +153,16 @@ impl BuildStep for DockerImage {
156153
}
157154
}
158155

156+
fn is_insecure_registry(
157+
registry: &str, insecure_registries: &HashSet<String>
158+
) -> bool {
159+
let registry_host = match registry.split_once(':') {
160+
Some((host, _port)) => host,
161+
None => registry,
162+
};
163+
insecure_registries.contains(registry_host)
164+
}
165+
159166
/// See:
160167
/// - https://github.com/moby/moby/blob/v20.10.11/pkg/archive/whiteouts.go
161168
/// - https://github.com/moby/moby/blob/v20.10.11/pkg/archive/diff.go#L131
@@ -195,16 +202,10 @@ fn whiteout_entry_handler(entry: &Entry<Box<dyn Read>>, dst_path: &Path) -> Resu
195202
Ok(false)
196203
}
197204

198-
fn is_insecure_registry(registry: &str, insecure_registries: &HashSet<String>) -> bool {
199-
let registry_url = url::Url::parse(&format!("http://{}", registry)).unwrap();
200-
let registry_host = registry_url.domain().unwrap();
201-
insecure_registries.contains(registry_host)
202-
}
203-
204205
#[cfg(feature="containers")]
205-
async fn download_image(
206-
registry: &str, insecure: bool, image: &str, tag: &str
207-
) -> Result<Vec<PathBuf>, StepError> {
206+
async fn download_and_unpack_image(
207+
registry: &str, insecure: bool, image: &str, tag: &str, dst_path: &Path
208+
) -> Result<(), StepError> {
208209
let auth_scope = format!("repository:{}:pull", image);
209210
let client = build_client(registry, insecure, &[&auth_scope]).await?;
210211

@@ -216,22 +217,55 @@ async fn download_image(
216217
let layers_download_semaphore = Arc::new(
217218
Semaphore::new(DOCKER_LAYERS_DOWNLOAD_CONCURRENCY)
218219
);
219-
let layers_futures = layers_digests.iter()
220-
.map(|digest| {
221-
let image = image.to_string();
222-
let digest = digest.clone();
223-
let client = client.clone();
224-
let sem = layers_download_semaphore.clone();
225-
tokio::spawn(async move {
226-
if let Ok(_guard) = sem.acquire().await {
227-
info!("Downloading docker layer: {}", &digest);
228-
download_blob(&client, &image, &digest).await
229-
} else {
230-
panic!("Semaphore was closed unexpectedly")
220+
221+
use tokio::sync::oneshot;
222+
223+
let mut layers_futures = vec!();
224+
let mut unpack_channels = vec!();
225+
for digest in &layers_digests {
226+
let image = image.to_string();
227+
let digest = digest.clone();
228+
let client = client.clone();
229+
let sem = layers_download_semaphore.clone();
230+
let (tx, rx) = oneshot::channel();
231+
unpack_channels.push(rx);
232+
let download_future = tokio::spawn(async move {
233+
if let Ok(_guard) = sem.acquire().await {
234+
println!("Downloading docker layer: {}", &digest);
235+
match download_blob(&client, &image, &digest).await {
236+
Ok(layer_path) => {
237+
if let Err(_) = tx.send((digest.clone(), layer_path)) {
238+
return Err(format!("Error sending downloaded layer"));
239+
}
240+
Ok(())
241+
}
242+
Err(e) => Err(e)
243+
}
244+
} else {
245+
panic!("Semaphore was closed unexpectedly")
246+
}
247+
});
248+
layers_futures.push(download_future);
249+
}
250+
251+
let dst_path = dst_path.to_path_buf();
252+
let unpack_future = tokio::spawn(async move {
253+
for ch in unpack_channels {
254+
match ch.await {
255+
Ok((digest, layer_path)) => {
256+
let dst_path = dst_path.clone();
257+
if let Err(e) = unpack_layer(digest, layer_path, dst_path).await {
258+
return Err(e);
259+
}
231260
}
232-
})
233-
})
234-
.collect::<Vec<_>>();
261+
Err(e) => return Err(
262+
format!("Error waiting downloaded layer: {}", e)
263+
),
264+
}
265+
}
266+
Ok(())
267+
});
268+
235269
let mut layers_paths = vec!();
236270
let mut layers_errors = vec!();
237271
for layer_res in futures::future::join_all(layers_futures).await.into_iter() {
@@ -241,13 +275,32 @@ async fn download_image(
241275
Err(join_err) => layers_errors.push(format!("{}", join_err)),
242276
}
243277
}
278+
279+
unpack_future.await
280+
.map_err(|e| format!("Error waiting unpack future: {}", e))??;
281+
244282
if !layers_errors.is_empty() {
245283
Err(layers_errors.into())
246284
} else {
247-
Ok(layers_paths)
285+
Ok(())
248286
}
249287
}
250288

289+
async fn unpack_layer(
290+
digest: String, layer_path: PathBuf, dst_path: PathBuf
291+
) -> Result<(), String> {
292+
let unpack_future_res = tokio::task::spawn_blocking(move || {
293+
println!("Unpacking docker layer: {}", digest);
294+
TarCmd::new(&layer_path, &dst_path)
295+
.preserve_owner(true)
296+
.entry_handler(whiteout_entry_handler)
297+
.unpack()
298+
}).await;
299+
unpack_future_res
300+
.map_err(|e| format!("Error waiting a unpack layer future: {}", e))?
301+
.map_err(|e| format!("Error unpacking docker layer: {}", e))
302+
}
303+
251304
#[cfg(feature="containers")]
252305
async fn build_client(
253306
registry: &str, insecure: bool, auth_scopes: &[&str]
@@ -271,7 +324,9 @@ async fn build_client(
271324
async fn download_blob(
272325
client: &RegistryClient, image: &str, layer_digest: &str
273326
) -> Result<PathBuf, String> {
274-
let digest = layer_digest.split_once(':').unwrap().1;
327+
let digest = layer_digest.split_once(':')
328+
.ok_or(format!("Invalid layer digest: {}", layer_digest))?
329+
.1;
275330
let short_digest = &digest[..12];
276331

277332
let layers_cache = Path::new(DOCKER_LAYERS_CACHE_PATH);
@@ -298,11 +353,11 @@ async fn download_blob(
298353

299354
println!("Downloading docker blob: {}", &short_digest);
300355
let mut blob_stream = client.get_blob_stream(image, layer_digest).await
301-
.expect("Get blob response");
356+
.map_err(|e| format!("Error getting docker blob response: {}", e))?;
302357
let mut blob_file = tokio::fs::File::create(&blob_tmp_path).await
303-
.expect("Create layer file");
358+
.map_err(|e| format!("Cannot create layer file: {}", e))?;
304359
while let Some(chunk) = blob_stream.next().await {
305-
let chunk = chunk.expect("Layer chunk");
360+
let chunk = chunk.map_err(|e| format!("Error fetching layer chunk: {}", e))?;
306361
blob_file.write_all(&chunk).await
307362
.map_err(|e| format!("Cannot write blob file: {}", e))?;
308363
}

0 commit comments

Comments
 (0)