Skip to content

Commit 429a060

Browse files
authored
Parallel fetching of available versions (#2280)
Implements rate-limited parallel fetching of version information when solving. Also does speculative pre-fetching of version information of dependencies of the newest versions of each package. In informal benchmarks of pub get for a package whose only direct dependency is package:test the resolution time goes from 7.8 seconds to 1.5 seconds. Also adds package:pedantic to our direct runtime dependencies
1 parent 6705b08 commit 429a060

9 files changed

+532
-92
lines changed

doc/repository-spec-v2.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ repository.
6262
}
6363
```
6464

65-
### Inspect a specific version of a package
65+
### (Deprecated) Inspect a specific version of a package
66+
67+
**Deprecated** as of Dart 2.8, use "List all versions of a package" instead.
6668

6769
**GET** `<PUB_HOSTED_URL>/api/packages/<PACKAGE>/versions/<VERSION>`
6870

@@ -99,4 +101,3 @@ MUST support redirects.
99101

100102
The API for authenticating and publishing packages is not formalized yet, see
101103
[#1381](https://github.com/dart-lang/pub/issues/1381).
102-

lib/src/rate_limited_scheduler.dart

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'dart:collection';
7+
8+
import 'package:pool/pool.dart';
9+
import 'package:pedantic/pedantic.dart';
10+
11+
/// Handles rate-limited scheduling of tasks.
12+
///
13+
/// Tasks are identified by a jobId of type [J] (should be useful as a Hash-key)
14+
/// and run with a supplied async function.
15+
///
16+
/// Designed to allow speculatively running tasks that will likely be needed
17+
/// later with [withPrescheduling].
18+
///
19+
/// Errors thrown by tasks scheduled with the `preschedule` callback will only
20+
/// be triggered when you await the [Future] returned by [schedule].
21+
///
22+
/// The operation will run in the [Zone] that the task was in when enqueued.
23+
///
24+
/// If a task if [preschedule]d and later [schedule]d before the operation is
25+
/// started, the task will go in front of the queue with the zone of the
26+
/// [schedule] operation.
27+
///
28+
/// Example:
29+
///
30+
/// ```dart
31+
/// // A scheduler that, given a uri, gets that page and returns the body
32+
/// final scheduler = RateLimitedScheduler(http.read);
33+
///
34+
/// scheduler.withPresceduling((preschedule) {
35+
/// // Start fetching `pub.dev` and `dart.dev` in the background.
36+
/// scheduler.preschedule(Uri.parse('https://pub.dev/'));
37+
/// scheduler.preschedule(Uri.parse('https://dart.dev/'));
38+
/// // ... do time-consuming task.
39+
/// // Now we actually need `pub.dev`.
40+
/// final pubDevBody =
41+
/// await scheduler.schedule(Uri.parse('https://pub.dev/'));
42+
/// // if the `dart.dev` task has not started yet, it will be canceled when
43+
/// // leaving `withPresceduling`.
44+
/// });
45+
/// ```
46+
class RateLimitedScheduler<J, V> {
47+
final Future<V> Function(J) _runJob;
48+
49+
/// The results of ongoing and finished jobs.
50+
final Map<J, Completer<V>> _cache = <J, Completer<V>>{};
51+
52+
/// Tasks that are waiting to be run.
53+
final Queue<_Task<J>> _queue = Queue<_Task<J>>();
54+
55+
/// Rate limits the number of concurrent jobs.
56+
final Pool _pool;
57+
58+
/// Jobs that have started running.
59+
final Set<J> _started = {};
60+
61+
RateLimitedScheduler(Future<V> Function(J) runJob,
62+
{maxConcurrentOperations = 10})
63+
: _runJob = runJob,
64+
_pool = Pool(maxConcurrentOperations);
65+
66+
/// Pick the next task in [_queue] and run it.
67+
///
68+
/// If the task is already in [_started] it will not be run again.
69+
Future<void> _processNextTask() async {
70+
if (_queue.isEmpty) {
71+
return;
72+
}
73+
final task = _queue.removeFirst();
74+
final completer = _cache[task.jobId];
75+
76+
if (!_started.add(task.jobId)) {
77+
return;
78+
}
79+
80+
// Use an async function to catch sync exceptions from _runJob.
81+
Future<V> runJob() async {
82+
return await task.zone.runUnary(_runJob, task.jobId);
83+
}
84+
85+
completer.complete(runJob());
86+
// Listen to errors on the completer:
87+
// this will make errors thrown by [_run] not
88+
// become uncaught.
89+
//
90+
// They will still show up for other listeners of the future.
91+
await completer.future.catchError((_) {});
92+
}
93+
94+
/// Calls [callback] with a function that can pre-schedule jobs.
95+
///
96+
/// When [callback] returns, all jobs that where prescheduled by [callback]
97+
/// that have not started running will be removed from the work queue
98+
/// (if they have been added seperately by [schedule] they will still be
99+
/// executed).
100+
Future<R> withPrescheduling<R>(
101+
FutureOr<R> Function(void Function(J) preschedule) callback,
102+
) async {
103+
final prescheduled = <_Task>{};
104+
try {
105+
return await callback((jobId) {
106+
if (_started.contains(jobId)) return;
107+
final task = _Task(jobId, Zone.current);
108+
_cache.putIfAbsent(jobId, () => Completer());
109+
_queue.addLast(task);
110+
prescheduled.add(task);
111+
112+
unawaited(_pool.withResource(_processNextTask));
113+
});
114+
} finally {
115+
_queue.removeWhere(prescheduled.contains);
116+
}
117+
}
118+
119+
/// Returns a future that completed with the result of running [jobId].
120+
///
121+
/// If [jobId] has already run, the cached result will be returned.
122+
/// If [jobId] is not yet running, it will go to the front of the work queue
123+
/// to be scheduled next when there are free resources.
124+
Future<V> schedule(J jobId) {
125+
final completer = _cache.putIfAbsent(jobId, () => Completer());
126+
if (!_started.contains(jobId)) {
127+
final task = _Task(jobId, Zone.current);
128+
_queue.addFirst(task);
129+
scheduleMicrotask(() => _pool.withResource(_processNextTask));
130+
}
131+
return completer.future;
132+
}
133+
}
134+
135+
class _Task<J> {
136+
final J jobId;
137+
final Zone zone;
138+
_Task(this.jobId, this.zone);
139+
140+
@override
141+
String toString() => jobId.toString();
142+
}

lib/src/solver/version_solver.dart

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,15 @@ class VersionSolver {
9191
[Term(PackageRange.root(_root), false)], IncompatibilityCause.root));
9292

9393
try {
94-
var next = _root.name;
95-
while (next != null) {
96-
_propagate(next);
97-
next = await _choosePackageVersion();
98-
}
94+
return await _systemCache.hosted.withPrefetching(() async {
95+
var next = _root.name;
96+
while (next != null) {
97+
_propagate(next);
98+
next = await _choosePackageVersion();
99+
}
99100

100-
return await _result();
101+
return await _result();
102+
});
101103
} finally {
102104
// Gather some solving metrics.
103105
log.solver('Version solving took ${stopwatch.elapsed} seconds.\n'

lib/src/source/git.dart

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import 'dart:async';
66
import 'dart:io';
77

88
import 'package:path/path.dart' as p;
9+
import 'package:pool/pool.dart';
910
import 'package:pub_semver/pub_semver.dart';
1011

1112
import '../git.dart' as git;
@@ -189,6 +190,10 @@ class GitSource extends Source {
189190

190191
/// The [BoundSource] for [GitSource].
191192
class BoundGitSource extends CachedSource {
193+
/// Limit the number of concurrent git operations to 1.
194+
// TODO(sigurdm): Use RateLimitedScheduler.
195+
final Pool _pool = Pool(1);
196+
192197
@override
193198
final GitSource source;
194199

@@ -221,27 +226,31 @@ class BoundGitSource extends CachedSource {
221226

222227
@override
223228
Future<List<PackageId>> doGetVersions(PackageRef ref) async {
224-
await _ensureRepoCache(ref);
225-
var path = _repoCachePath(ref);
226-
var revision = await _firstRevision(path, ref.description['ref']);
227-
var pubspec =
228-
await _describeUncached(ref, revision, ref.description['path']);
229-
230-
return [
231-
PackageId(ref.name, source, pubspec.version, {
232-
'url': ref.description['url'],
233-
'ref': ref.description['ref'],
234-
'resolved-ref': revision,
235-
'path': ref.description['path']
236-
})
237-
];
229+
return await _pool.withResource(() async {
230+
await _ensureRepoCache(ref);
231+
var path = _repoCachePath(ref);
232+
var revision = await _firstRevision(path, ref.description['ref']);
233+
var pubspec =
234+
await _describeUncached(ref, revision, ref.description['path']);
235+
236+
return [
237+
PackageId(ref.name, source, pubspec.version, {
238+
'url': ref.description['url'],
239+
'ref': ref.description['ref'],
240+
'resolved-ref': revision,
241+
'path': ref.description['path']
242+
})
243+
];
244+
});
238245
}
239246

240247
/// Since we don't have an easy way to read from a remote Git repo, this
241248
/// just installs [id] into the system cache, then describes it from there.
242249
@override
243-
Future<Pubspec> describeUncached(PackageId id) => _describeUncached(
244-
id.toRef(), id.description['resolved-ref'], id.description['path']);
250+
Future<Pubspec> describeUncached(PackageId id) {
251+
return _pool.withResource(() => _describeUncached(
252+
id.toRef(), id.description['resolved-ref'], id.description['path']));
253+
}
245254

246255
/// Like [describeUncached], but takes a separate [ref] and Git [revision]
247256
/// rather than a single ID.
@@ -284,28 +293,32 @@ class BoundGitSource extends CachedSource {
284293
/// in `cache/`.
285294
@override
286295
Future<Package> downloadToSystemCache(PackageId id) async {
287-
var ref = id.toRef();
288-
if (!git.isInstalled) {
289-
fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n"
290-
'Please ensure Git is correctly installed.');
291-
}
292-
293-
ensureDir(p.join(systemCacheRoot, 'cache'));
294-
await _ensureRevision(ref, id.description['resolved-ref']);
295-
296-
var revisionCachePath = _revisionCachePath(id);
297-
await _revisionCacheClones.putIfAbsent(revisionCachePath, () async {
298-
if (!entryExists(revisionCachePath)) {
299-
await _clone(_repoCachePath(ref), revisionCachePath);
300-
await _checkOut(revisionCachePath, id.description['resolved-ref']);
301-
_writePackageList(revisionCachePath, [id.description['path']]);
302-
} else {
303-
_updatePackageList(revisionCachePath, id.description['path']);
296+
return await _pool.withResource(() async {
297+
var ref = id.toRef();
298+
if (!git.isInstalled) {
299+
fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n"
300+
'Please ensure Git is correctly installed.');
304301
}
305-
});
306302

307-
return Package.load(id.name,
308-
p.join(revisionCachePath, id.description['path']), systemCache.sources);
303+
ensureDir(p.join(systemCacheRoot, 'cache'));
304+
await _ensureRevision(ref, id.description['resolved-ref']);
305+
306+
var revisionCachePath = _revisionCachePath(id);
307+
await _revisionCacheClones.putIfAbsent(revisionCachePath, () async {
308+
if (!entryExists(revisionCachePath)) {
309+
await _clone(_repoCachePath(ref), revisionCachePath);
310+
await _checkOut(revisionCachePath, id.description['resolved-ref']);
311+
_writePackageList(revisionCachePath, [id.description['path']]);
312+
} else {
313+
_updatePackageList(revisionCachePath, id.description['path']);
314+
}
315+
});
316+
317+
return Package.load(
318+
id.name,
319+
p.join(revisionCachePath, id.description['path']),
320+
systemCache.sources);
321+
});
309322
}
310323

311324
/// Returns the path to the revision-specific cache of [id].

0 commit comments

Comments
 (0)