Skip to content

Parallel fetching of available versions #2280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e9869e5
First try
sigurdm Dec 16, 2019
0fd682e
Remove spurious newline
sigurdm Dec 16, 2019
3a86142
Tidy retriever
sigurdm Dec 16, 2019
9a284a1
Handle no latest version in server response
sigurdm Dec 16, 2019
dc0b904
Handle no latest version in server response, take 2
sigurdm Dec 16, 2019
65c1ace
Make retriever private
sigurdm Dec 16, 2019
bc179d8
Add test for retriever error handling
sigurdm Dec 17, 2019
fb5d23b
Make minByAsync stable
sigurdm Dec 17, 2019
21ddd5c
Handle getting pubspec of missing package
sigurdm Dec 17, 2019
c6b4533
Store current zone in Retriever
sigurdm Dec 17, 2019
7b8249b
Fox doccomment
sigurdm Dec 17, 2019
72ac701
Address reviews
sigurdm Dec 19, 2019
670755a
Only run prefetching when 'in the zone'
sigurdm Dec 30, 2019
6bdcf53
Address review
sigurdm Dec 30, 2019
49d165e
Fix typecheck
sigurdm Jan 2, 2020
baedad2
Reuse parsed versions
sigurdm Jan 2, 2020
de6f1ad
Adfress review
sigurdm Jan 3, 2020
d33c74a
Improve scheduler
sigurdm Jan 3, 2020
618c822
Simplify processing loop
sigurdm Jan 6, 2020
9a4fdc9
Use correct symbol as zone-key
sigurdm Jan 6, 2020
893ffc2
Remove unused variable
sigurdm Jan 6, 2020
3652db6
Merge branch 'master' into parallel_get
sigurdm Jan 13, 2020
5c39d4c
Fix lints
sigurdm Jan 13, 2020
af10ead
Merge branch 'parallel_get' of github.com:sigurdm/pub into parallel_get
sigurdm Jan 13, 2020
de1905f
More lints
sigurdm Jan 13, 2020
7155aba
Improve docs
sigurdm Jan 13, 2020
2060d9c
Merge branch 'master' into parallel_get
sigurdm Jan 13, 2020
4bbf3b5
Address review
sigurdm Jan 14, 2020
fb122eb
Improve doc
sigurdm Jan 14, 2020
57b7e94
Only do one git operation at a time
sigurdm Jan 14, 2020
42d0ce5
Also restrict access from describeUncached
sigurdm Jan 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions doc/repository-spec-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ repository.
}
```

### Inspect a specific version of a package
### (Deprecated) Inspect a specific version of a package

**Deprecated** as of Dart 2.8, use "List all versions of a package" instead.

**GET** `<PUB_HOSTED_URL>/api/packages/<PACKAGE>/versions/<VERSION>`

Expand Down Expand Up @@ -99,4 +101,3 @@ MUST support redirects.

The API for authenticating and publishing packages is not formalized yet, see
[#1381](https://github.com/dart-lang/pub/issues/1381).

159 changes: 159 additions & 0 deletions lib/src/retriever.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import "dart:collection";

import "package:pool/pool.dart";
import "package:async/async.dart";

/// Handles rate-limited scheduling of tasks.
///
/// Tasks are named with a key of type [K] (should be useful as a Hash-key) and
/// run with a supplied function producing a CancelableOperation.
///
/// Designed to allow prefetching of tasks that will likely be needed
/// later with [prefetch].
///
/// All current operations can be cancelled and future operations removed from
/// the queue with [stop].
///
/// Errors thrown by tasks scheduled with [prefetch] will only be triggered when
/// you await the Future returned by [fetch].
///
/// The operation will run in the [Zone] that the task was in when enqueued.
/// If a task if [prefetch]ed and later [fetch]ed before the operation is
/// started, the task will go in front of the queue with the zone of the [fetch]
/// operation.
///
/// Example:
///
/// ```dart
/// // A retriever that, given a uri, gets that page and returns the body
/// final retriever = Retriever(
/// (Uri uri, _) => return CancelableOperation.fromFuture(http.read(uri)));
/// // Start fetching `pub.dev` in the background.
/// retriever.prefetch(Uri.parse('https://pub.dev/'));
/// // ... do time-consuming task.
///
/// // Now we actually need `pub.dev`.
/// final pubDevBody = await retriever.fetch(Uri.parse('https://pub.dev/'));
/// ```
class Retriever<K, V> {
final CancelableOperation<V> Function(K, Retriever) _run;

/// The results of ongoing and finished computations.
final Map<K, Completer<V>> _cache = <K, Completer<V>>{};

/// Operations that are waiting to run.
final Queue<_TaskWithZone<K>> _queue = Queue<_TaskWithZone<K>>();

/// Rate limits the downloads.
final Pool _pool;

/// The currently active operations.
final Map<K, CancelableOperation<V>> _active = <K, CancelableOperation<V>>{};

/// True when the processing loop is running.
bool _started = false;

Retriever(CancelableOperation<V> Function(K, Retriever) run,
{maxConcurrentOperations = 10})
: _run = run,
_pool = Pool(maxConcurrentOperations);

Retriever.nonCancelable(Future<V> Function(K, Retriever) run,
{maxConcurrentOperations = 10})
: this(
(key, retriever) =>
CancelableOperation.fromFuture(run(key, retriever)),
maxConcurrentOperations: maxConcurrentOperations);

/// Starts running operations from the queue. Taking the first items first.
void _process() async {
assert(!_started);
_started = true;
while (_queue.isNotEmpty) {
final resource = await _pool.request();
// This checks if [stop] has been called while waiting for a resource.
if (!_started) {
resource.release();
break;
}
// Take the highest priority task from the queue.
final taskWithZone = _queue.removeFirst();
final task = taskWithZone.task;
// Create or get the completer to deliver the result to.
final completer = _cache.putIfAbsent(
task,
() => Completer()
// Listen to errors: this will make errors thrown by [_get] not
// become uncaught.
// They will still show up for other listeners of the future.
..future.catchError((error) {}));
// Already done or already scheduled => do nothing.
if (completer.isCompleted || _active.containsKey(task)) {
resource.release();
continue;
}

// Start running the operation for [task] in the original [Zone].
final zone = taskWithZone.zone;
final operation = zone.runBinary(_run, task, this);
_active[task] = operation;
operation
.then(completer.complete, onError: completer.completeError)
.value
.whenComplete(() {
resource.release();
_active.remove(task);
});
}
_started = false;
}

/// Cancels all active computations, and clears the queue.
void stop() {
// Stop the processing loop.
_started = false;
// Cancel all active operations.
for (final operation in _active.values) {
operation.cancel();
}
// Do not process the rest of the queue.
_queue.clear();
}

/// Puts [task] in the back of the work queue.
///
/// Task will be processed when there are free resources, and other already
/// queued tasks are done.
void prefetch(K task) {
_queue.addLast(_TaskWithZone.current(task));
if (!_started) _process();
}

/// Returns the result of running [task].
///
/// If [task] is already done, the cached result will be returned.
/// If [task] is not yet active, it will go to the front of the work queue
/// to be scheduled next when there are free resources.
Future<V> fetch(K task) {
final completer = _cache.putIfAbsent(task, () => Completer());
if (!completer.isCompleted) {
// We allow adding the same task twice to the queue.
// It will get dedupped by the [_process] loop.
_queue.addFirst(_TaskWithZone.current(task));
if (!_started) _process();
}
return completer.future;
}
}

class _TaskWithZone<K> {
final K task;
final Zone zone;
_TaskWithZone(this.task, this.zone);
_TaskWithZone.current(K task) : this(task, Zone.current);
}
3 changes: 3 additions & 0 deletions lib/src/solver/version_solver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class VersionSolver {

return await _result();
} finally {
// Stop pre-fetching package/version listings from hosted repository, as
// resolution is done.
_systemCache.hosted.stopPrefetching();
// Gather some solving metrics.
log.solver('Version solving took ${stopwatch.elapsed} seconds.\n'
'Tried ${_solution.attemptedSolutions} solutions.');
Expand Down
96 changes: 54 additions & 42 deletions lib/src/source/hosted.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import 'dart:io' as io;

import 'package:http/http.dart' as http;
import 'package:path/path.dart' as p;
import 'package:pub/src/retriever.dart';
import 'package:pub_semver/pub_semver.dart';
import 'package:stack_trace/stack_trace.dart';

Expand Down Expand Up @@ -144,35 +145,66 @@ class BoundHostedSource extends CachedSource {
final HostedSource source;

final SystemCache systemCache;
Retriever<PackageRef, Map<PackageId, Pubspec>> _retriever;

BoundHostedSource(this.source, this.systemCache);
BoundHostedSource(this.source, this.systemCache) {
_retriever =
Retriever.nonCancelable(_getVersions, maxConcurrentOperations: 10);
}

/// Downloads a list of all versions of a package that are available from the
/// site.
Future<List<PackageId>> doGetVersions(PackageRef ref) async {
Future<Map<PackageId, Pubspec>> _getVersions(
PackageRef ref, Retriever retriever) async {
var url = _makeUrl(
ref.description, (server, package) => "$server/api/packages/$package");

log.io("Get versions from $url.");

String body;
try {
// TODO(sigurdm): Implement cancellation of requests. This probably
// requires resolution of: https://github.com/dart-lang/sdk/issues/22265.
body = await httpClient.read(url, headers: pubApiHeaders);
} catch (error, stackTrace) {
var parsed = source._parseDescription(ref.description);
_throwFriendlyError(error, stackTrace, parsed.first, parsed.last);
}

var doc = jsonDecode(body);
return (doc['versions'] as List).map((map) {
final doc = jsonDecode(body);
final result = Map.fromEntries((doc['versions'] as List).map((map) {
var pubspec = Pubspec.fromMap(map['pubspec'], systemCache.sources,
expectedName: ref.name, location: url);
var id = source.idFor(ref.name, pubspec.version,
url: _serverFor(ref.description));
memoizePubspec(id, pubspec);
return MapEntry(id, pubspec);
}));

// Prefetch the dependencies of the latest version, we are likely to need
// them later.
void prefetch() {
final latest = doc['latest'];
if (latest is! Map) return;
final latestVersionString = latest['version'];
if (latestVersionString is! String) return;

final latestVersionId = PackageId(ref.name, source,
Version.parse(latestVersionString as String), ref.description);

final dependencies = result[latestVersionId]?.dependencies?.values ?? [];
if (dependencies.isNotEmpty) {
withDependencyType(DependencyType.none, () async {
for (final packageRange in dependencies) {
retriever.prefetch(packageRange.toRef());
}
});
}
}

return id;
}).toList();
prefetch();
return result;
}

/// Downloads a list of all versions of a package that are available from the
/// site.
Future<List<PackageId>> doGetVersions(PackageRef ref) async {
final versions = await _retriever.fetch(ref);
return versions.keys.toList();
}

/// Parses [description] into its server and package name components, then
Expand All @@ -186,26 +218,14 @@ class BoundHostedSource extends CachedSource {
return Uri.parse(pattern(server, package));
}

/// Downloads and parses the pubspec for a specific version of a package that
/// is available from the site.
/// Retrieves the pubspec for a specific version of a package that is
/// available from the site.
Future<Pubspec> describeUncached(PackageId id) async {
// Request it from the server.
var url = _makeVersionUrl(
id,
(server, package, version) =>
"$server/api/packages/$package/versions/$version");

log.io("Describe package at $url.");
Map<String, dynamic> version;
try {
version = jsonDecode(await httpClient.read(url, headers: pubApiHeaders));
} catch (error, stackTrace) {
var parsed = source._parseDescription(id.description);
_throwFriendlyError(error, stackTrace, id.name, parsed.last);
}

return Pubspec.fromMap(version['pubspec'], systemCache.sources,
expectedName: id.name, location: url);
final versions = await _retriever.fetch(id.toRef());
final url = _makeUrl(
id.description, (server, package) => "$server/api/packages/$package");
return versions[id] ??
(throw PackageNotFoundException("Could not find package $id at $url"));
}

/// Downloads the package identified by [id] to the system cache.
Expand Down Expand Up @@ -420,17 +440,9 @@ class BoundHostedSource extends CachedSource {
Uri _serverFor(description) =>
Uri.parse(source._parseDescription(description).last);

/// Parses [id] into its server, package name, and version components, then
/// converts that to a Uri given [pattern].
///
/// Ensures the package name is properly URL encoded.
Uri _makeVersionUrl(PackageId id,
String pattern(String server, String package, String version)) {
var parsed = source._parseDescription(id.description);
var server = parsed.last;
var package = Uri.encodeComponent(parsed.first);
var version = Uri.encodeComponent(id.version.toString());
return Uri.parse(pattern(server, package, version));
// Stops the speculative prefetching of package versions.
void stopPrefetching() {
_retriever.stop();
}
}

Expand Down
19 changes: 13 additions & 6 deletions lib/src/utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -292,20 +292,27 @@ T maxAll<T extends Comparable>(Iterable<T> iter,
.reduce((max, element) => compare(element, max) > 0 ? element : max);
}

/// Like [minBy], but with an asynchronous [orderBy] callback.
/// Returns the element of [values] for which [orderBy] returns the smallest
/// value.
///
/// Returns the first such value in case of ties.
///
/// Starts all the [orderBy] invocations in parallel.
Future<S> minByAsync<S, T>(
Iterable<S> values, Future<T> orderBy(S element)) async {
S minValue;
int minIndex;
T minOrderBy;
for (var element in values) {
var elementOrderBy = await orderBy(element);
List valuesList = values.toList();
final orderByResults = await Future.wait(values.map(orderBy));
for (var i = 0; i < orderByResults.length; i++) {
final elementOrderBy = orderByResults[i];
if (minOrderBy == null ||
(elementOrderBy as Comparable).compareTo(minOrderBy) < 0) {
minValue = element;
minIndex = i;
minOrderBy = elementOrderBy;
}
}
return minValue;
return valuesList[minIndex];
}

/// Like [List.sublist], but for any iterable.
Expand Down
Loading