Skip to content

Parallel fetching of available versions #2280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e9869e5
First try
sigurdm Dec 16, 2019
0fd682e
Remove spurious newline
sigurdm Dec 16, 2019
3a86142
Tidy retriever
sigurdm Dec 16, 2019
9a284a1
Handle no latest version in server response
sigurdm Dec 16, 2019
dc0b904
Handle no latest version in server response, take 2
sigurdm Dec 16, 2019
65c1ace
Make retriever private
sigurdm Dec 16, 2019
bc179d8
Add test for retriever error handling
sigurdm Dec 17, 2019
fb5d23b
Make minByAsync stable
sigurdm Dec 17, 2019
21ddd5c
Handle getting pubspec of missing package
sigurdm Dec 17, 2019
c6b4533
Store current zone in Retriever
sigurdm Dec 17, 2019
7b8249b
Fox doccomment
sigurdm Dec 17, 2019
72ac701
Address reviews
sigurdm Dec 19, 2019
670755a
Only run prefetching when 'in the zone'
sigurdm Dec 30, 2019
6bdcf53
Address review
sigurdm Dec 30, 2019
49d165e
Fix typecheck
sigurdm Jan 2, 2020
baedad2
Reuse parsed versions
sigurdm Jan 2, 2020
de6f1ad
Adfress review
sigurdm Jan 3, 2020
d33c74a
Improve scheduler
sigurdm Jan 3, 2020
618c822
Simplify processing loop
sigurdm Jan 6, 2020
9a4fdc9
Use correct symbol as zone-key
sigurdm Jan 6, 2020
893ffc2
Remove unused variable
sigurdm Jan 6, 2020
3652db6
Merge branch 'master' into parallel_get
sigurdm Jan 13, 2020
5c39d4c
Fix lints
sigurdm Jan 13, 2020
af10ead
Merge branch 'parallel_get' of github.com:sigurdm/pub into parallel_get
sigurdm Jan 13, 2020
de1905f
More lints
sigurdm Jan 13, 2020
7155aba
Improve docs
sigurdm Jan 13, 2020
2060d9c
Merge branch 'master' into parallel_get
sigurdm Jan 13, 2020
4bbf3b5
Address review
sigurdm Jan 14, 2020
fb122eb
Improve doc
sigurdm Jan 14, 2020
57b7e94
Only do one git operation at a time
sigurdm Jan 14, 2020
42d0ce5
Also restrict access from describeUncached
sigurdm Jan 14, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions doc/repository-spec-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ repository.
}
```

### Inspect a specific version of a package
### (Deprecated) Inspect a specific version of a package

**Deprecated** as of Dart 2.8, use "List all versions of a package" instead.

**GET** `<PUB_HOSTED_URL>/api/packages/<PACKAGE>/versions/<VERSION>`

Expand Down Expand Up @@ -99,4 +101,3 @@ MUST support redirects.

The API for authenticating and publishing packages is not formalized yet, see
[#1381](https://github.com/dart-lang/pub/issues/1381).

142 changes: 142 additions & 0 deletions lib/src/rate_limited_scheduler.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:collection';

import 'package:pool/pool.dart';
import 'package:pedantic/pedantic.dart';

/// Handles rate-limited scheduling of tasks.
///
/// Tasks are identified by a jobId of type [J] (should be useful as a Hash-key)
/// and run with a supplied async function.
///
/// Designed to allow speculatively running tasks that will likely be needed
/// later with [withPrescheduling].
///
/// Errors thrown by tasks scheduled with the `preschedule` callback will only
/// be triggered when you await the [Future] returned by [schedule].
///
/// The operation will run in the [Zone] that the task was in when enqueued.
///
/// If a task if [preschedule]d and later [schedule]d before the operation is
/// started, the task will go in front of the queue with the zone of the
/// [schedule] operation.
///
/// Example:
///
/// ```dart
/// // A scheduler that, given a uri, gets that page and returns the body
/// final scheduler = RateLimitedScheduler(http.read);
///
/// scheduler.withPresceduling((preschedule) {
/// // Start fetching `pub.dev` and `dart.dev` in the background.
/// scheduler.preschedule(Uri.parse('https://pub.dev/'));
/// scheduler.preschedule(Uri.parse('https://dart.dev/'));
/// // ... do time-consuming task.
/// // Now we actually need `pub.dev`.
/// final pubDevBody =
/// await scheduler.schedule(Uri.parse('https://pub.dev/'));
/// // if the `dart.dev` task has not started yet, it will be canceled when
/// // leaving `withPresceduling`.
/// });
/// ```
class RateLimitedScheduler<J, V> {
final Future<V> Function(J) _runJob;

/// The results of ongoing and finished jobs.
final Map<J, Completer<V>> _cache = <J, Completer<V>>{};

/// Tasks that are waiting to be run.
final Queue<_Task<J>> _queue = Queue<_Task<J>>();

/// Rate limits the number of concurrent jobs.
final Pool _pool;

/// Jobs that have started running.
final Set<J> _started = {};

RateLimitedScheduler(Future<V> Function(J) runJob,
{maxConcurrentOperations = 10})
: _runJob = runJob,
_pool = Pool(maxConcurrentOperations);

/// Pick the next task in [_queue] and run it.
///
/// If the task is already in [_started] it will not be run again.
Future<void> _processNextTask() async {
if (_queue.isEmpty) {
return;
}
final task = _queue.removeFirst();
final completer = _cache[task.jobId];

if (!_started.add(task.jobId)) {
return;
}

// Use an async function to catch sync exceptions from _runJob.
Future<V> runJob() async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate function for this? Can we instead do

completer.complete(task.zone.runUnary(_runJob, task.jobId));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to handle the case that _runJob throws sync exceptions
Added an explanatory comment

return await task.zone.runUnary(_runJob, task.jobId);
}

completer.complete(runJob());
// Listen to errors on the completer:
// this will make errors thrown by [_run] not
// become uncaught.
//
// They will still show up for other listeners of the future.
await completer.future.catchError((_) {});
}

/// Calls [callback] with a function that can pre-schedule jobs.
///
/// When [callback] returns, all jobs that where prescheduled by [callback]
/// that have not started running will be removed from the work queue
/// (if they have been added seperately by [schedule] they will still be
/// executed).
Future<R> withPrescheduling<R>(
FutureOr<R> Function(void Function(J) preschedule) callback,
) async {
final prescheduled = <_Task>{};
try {
return await callback((jobId) {
if (_started.contains(jobId)) return;
final task = _Task(jobId, Zone.current);
_cache.putIfAbsent(jobId, () => Completer());
_queue.addLast(task);
prescheduled.add(task);

unawaited(_pool.withResource(_processNextTask));
});
} finally {
_queue.removeWhere(prescheduled.contains);
}
}

/// Returns a future that completed with the result of running [jobId].
///
/// If [jobId] has already run, the cached result will be returned.
/// If [jobId] is not yet running, it will go to the front of the work queue
/// to be scheduled next when there are free resources.
Future<V> schedule(J jobId) {
final completer = _cache.putIfAbsent(jobId, () => Completer());
if (!_started.contains(jobId)) {
final task = _Task(jobId, Zone.current);
_queue.addFirst(task);
scheduleMicrotask(() => _pool.withResource(_processNextTask));
}
return completer.future;
}
}

class _Task<J> {
final J jobId;
final Zone zone;
_Task(this.jobId, this.zone);

@override
String toString() => jobId.toString();
}
14 changes: 8 additions & 6 deletions lib/src/solver/version_solver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ class VersionSolver {
[Term(PackageRange.root(_root), false)], IncompatibilityCause.root));

try {
var next = _root.name;
while (next != null) {
_propagate(next);
next = await _choosePackageVersion();
}
return await _systemCache.hosted.withPrefetching(() async {
var next = _root.name;
while (next != null) {
_propagate(next);
next = await _choosePackageVersion();
}

return await _result();
return await _result();
});
} finally {
// Gather some solving metrics.
log.solver('Version solving took ${stopwatch.elapsed} seconds.\n'
Expand Down
85 changes: 49 additions & 36 deletions lib/src/source/git.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'dart:async';
import 'dart:io';

import 'package:path/path.dart' as p;
import 'package:pool/pool.dart';
import 'package:pub_semver/pub_semver.dart';

import '../git.dart' as git;
Expand Down Expand Up @@ -189,6 +190,10 @@ class GitSource extends Source {

/// The [BoundSource] for [GitSource].
class BoundGitSource extends CachedSource {
/// Limit the number of concurrent git operations to 1.
// TODO(sigurdm): Use RateLimitedScheduler.
final Pool _pool = Pool(1);

@override
final GitSource source;

Expand Down Expand Up @@ -221,27 +226,31 @@ class BoundGitSource extends CachedSource {

@override
Future<List<PackageId>> doGetVersions(PackageRef ref) async {
await _ensureRepoCache(ref);
var path = _repoCachePath(ref);
var revision = await _firstRevision(path, ref.description['ref']);
var pubspec =
await _describeUncached(ref, revision, ref.description['path']);

return [
PackageId(ref.name, source, pubspec.version, {
'url': ref.description['url'],
'ref': ref.description['ref'],
'resolved-ref': revision,
'path': ref.description['path']
})
];
return await _pool.withResource(() async {
await _ensureRepoCache(ref);
var path = _repoCachePath(ref);
var revision = await _firstRevision(path, ref.description['ref']);
var pubspec =
await _describeUncached(ref, revision, ref.description['path']);

return [
PackageId(ref.name, source, pubspec.version, {
'url': ref.description['url'],
'ref': ref.description['ref'],
'resolved-ref': revision,
'path': ref.description['path']
})
];
});
}

/// Since we don't have an easy way to read from a remote Git repo, this
/// just installs [id] into the system cache, then describes it from there.
@override
Future<Pubspec> describeUncached(PackageId id) => _describeUncached(
id.toRef(), id.description['resolved-ref'], id.description['path']);
Future<Pubspec> describeUncached(PackageId id) {
return _pool.withResource(() => _describeUncached(
id.toRef(), id.description['resolved-ref'], id.description['path']));
}

/// Like [describeUncached], but takes a separate [ref] and Git [revision]
/// rather than a single ID.
Expand Down Expand Up @@ -284,28 +293,32 @@ class BoundGitSource extends CachedSource {
/// in `cache/`.
@override
Future<Package> downloadToSystemCache(PackageId id) async {
var ref = id.toRef();
if (!git.isInstalled) {
fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n"
'Please ensure Git is correctly installed.');
}

ensureDir(p.join(systemCacheRoot, 'cache'));
await _ensureRevision(ref, id.description['resolved-ref']);

var revisionCachePath = _revisionCachePath(id);
await _revisionCacheClones.putIfAbsent(revisionCachePath, () async {
if (!entryExists(revisionCachePath)) {
await _clone(_repoCachePath(ref), revisionCachePath);
await _checkOut(revisionCachePath, id.description['resolved-ref']);
_writePackageList(revisionCachePath, [id.description['path']]);
} else {
_updatePackageList(revisionCachePath, id.description['path']);
return await _pool.withResource(() async {
var ref = id.toRef();
if (!git.isInstalled) {
fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n"
'Please ensure Git is correctly installed.');
}
});

return Package.load(id.name,
p.join(revisionCachePath, id.description['path']), systemCache.sources);
ensureDir(p.join(systemCacheRoot, 'cache'));
await _ensureRevision(ref, id.description['resolved-ref']);

var revisionCachePath = _revisionCachePath(id);
await _revisionCacheClones.putIfAbsent(revisionCachePath, () async {
if (!entryExists(revisionCachePath)) {
await _clone(_repoCachePath(ref), revisionCachePath);
await _checkOut(revisionCachePath, id.description['resolved-ref']);
_writePackageList(revisionCachePath, [id.description['path']]);
} else {
_updatePackageList(revisionCachePath, id.description['path']);
}
});

return Package.load(
id.name,
p.join(revisionCachePath, id.description['path']),
systemCache.sources);
});
}

/// Returns the path to the revision-specific cache of [id].
Expand Down
Loading