diff --git a/doc/repository-spec-v2.md b/doc/repository-spec-v2.md index 8b24da2bb..4f00f582e 100644 --- a/doc/repository-spec-v2.md +++ b/doc/repository-spec-v2.md @@ -62,7 +62,9 @@ repository. } ``` -### Inspect a specific version of a package +### (Deprecated) Inspect a specific version of a package + +**Deprecated** as of Dart 2.8, use "List all versions of a package" instead. **GET** `/api/packages//versions/` @@ -99,4 +101,3 @@ MUST support redirects. The API for authenticating and publishing packages is not formalized yet, see [#1381](https://github.com/dart-lang/pub/issues/1381). - diff --git a/lib/src/rate_limited_scheduler.dart b/lib/src/rate_limited_scheduler.dart new file mode 100644 index 000000000..99f9ce3d2 --- /dev/null +++ b/lib/src/rate_limited_scheduler.dart @@ -0,0 +1,142 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:collection'; + +import 'package:pool/pool.dart'; +import 'package:pedantic/pedantic.dart'; + +/// Handles rate-limited scheduling of tasks. +/// +/// Tasks are identified by a jobId of type [J] (should be useful as a Hash-key) +/// and run with a supplied async function. +/// +/// Designed to allow speculatively running tasks that will likely be needed +/// later with [withPrescheduling]. +/// +/// Errors thrown by tasks scheduled with the `preschedule` callback will only +/// be triggered when you await the [Future] returned by [schedule]. +/// +/// The operation will run in the [Zone] that the task was in when enqueued. +/// +/// If a task if [preschedule]d and later [schedule]d before the operation is +/// started, the task will go in front of the queue with the zone of the +/// [schedule] operation. +/// +/// Example: +/// +/// ```dart +/// // A scheduler that, given a uri, gets that page and returns the body +/// final scheduler = RateLimitedScheduler(http.read); +/// +/// scheduler.withPresceduling((preschedule) { +/// // Start fetching `pub.dev` and `dart.dev` in the background. +/// scheduler.preschedule(Uri.parse('https://pub.dev/')); +/// scheduler.preschedule(Uri.parse('https://dart.dev/')); +/// // ... do time-consuming task. +/// // Now we actually need `pub.dev`. +/// final pubDevBody = +/// await scheduler.schedule(Uri.parse('https://pub.dev/')); +/// // if the `dart.dev` task has not started yet, it will be canceled when +/// // leaving `withPresceduling`. +/// }); +/// ``` +class RateLimitedScheduler { + final Future Function(J) _runJob; + + /// The results of ongoing and finished jobs. + final Map> _cache = >{}; + + /// Tasks that are waiting to be run. + final Queue<_Task> _queue = Queue<_Task>(); + + /// Rate limits the number of concurrent jobs. + final Pool _pool; + + /// Jobs that have started running. + final Set _started = {}; + + RateLimitedScheduler(Future Function(J) runJob, + {maxConcurrentOperations = 10}) + : _runJob = runJob, + _pool = Pool(maxConcurrentOperations); + + /// Pick the next task in [_queue] and run it. + /// + /// If the task is already in [_started] it will not be run again. + Future _processNextTask() async { + if (_queue.isEmpty) { + return; + } + final task = _queue.removeFirst(); + final completer = _cache[task.jobId]; + + if (!_started.add(task.jobId)) { + return; + } + + // Use an async function to catch sync exceptions from _runJob. + Future runJob() async { + return await task.zone.runUnary(_runJob, task.jobId); + } + + completer.complete(runJob()); + // Listen to errors on the completer: + // this will make errors thrown by [_run] not + // become uncaught. + // + // They will still show up for other listeners of the future. + await completer.future.catchError((_) {}); + } + + /// Calls [callback] with a function that can pre-schedule jobs. + /// + /// When [callback] returns, all jobs that where prescheduled by [callback] + /// that have not started running will be removed from the work queue + /// (if they have been added seperately by [schedule] they will still be + /// executed). + Future withPrescheduling( + FutureOr Function(void Function(J) preschedule) callback, + ) async { + final prescheduled = <_Task>{}; + try { + return await callback((jobId) { + if (_started.contains(jobId)) return; + final task = _Task(jobId, Zone.current); + _cache.putIfAbsent(jobId, () => Completer()); + _queue.addLast(task); + prescheduled.add(task); + + unawaited(_pool.withResource(_processNextTask)); + }); + } finally { + _queue.removeWhere(prescheduled.contains); + } + } + + /// Returns a future that completed with the result of running [jobId]. + /// + /// If [jobId] has already run, the cached result will be returned. + /// If [jobId] is not yet running, it will go to the front of the work queue + /// to be scheduled next when there are free resources. + Future schedule(J jobId) { + final completer = _cache.putIfAbsent(jobId, () => Completer()); + if (!_started.contains(jobId)) { + final task = _Task(jobId, Zone.current); + _queue.addFirst(task); + scheduleMicrotask(() => _pool.withResource(_processNextTask)); + } + return completer.future; + } +} + +class _Task { + final J jobId; + final Zone zone; + _Task(this.jobId, this.zone); + + @override + String toString() => jobId.toString(); +} diff --git a/lib/src/solver/version_solver.dart b/lib/src/solver/version_solver.dart index 675310c08..78c80a1cc 100644 --- a/lib/src/solver/version_solver.dart +++ b/lib/src/solver/version_solver.dart @@ -91,13 +91,15 @@ class VersionSolver { [Term(PackageRange.root(_root), false)], IncompatibilityCause.root)); try { - var next = _root.name; - while (next != null) { - _propagate(next); - next = await _choosePackageVersion(); - } + return await _systemCache.hosted.withPrefetching(() async { + var next = _root.name; + while (next != null) { + _propagate(next); + next = await _choosePackageVersion(); + } - return await _result(); + return await _result(); + }); } finally { // Gather some solving metrics. log.solver('Version solving took ${stopwatch.elapsed} seconds.\n' diff --git a/lib/src/source/git.dart b/lib/src/source/git.dart index f85e41fdc..2dfd4a328 100644 --- a/lib/src/source/git.dart +++ b/lib/src/source/git.dart @@ -6,6 +6,7 @@ import 'dart:async'; import 'dart:io'; import 'package:path/path.dart' as p; +import 'package:pool/pool.dart'; import 'package:pub_semver/pub_semver.dart'; import '../git.dart' as git; @@ -189,6 +190,10 @@ class GitSource extends Source { /// The [BoundSource] for [GitSource]. class BoundGitSource extends CachedSource { + /// Limit the number of concurrent git operations to 1. + // TODO(sigurdm): Use RateLimitedScheduler. + final Pool _pool = Pool(1); + @override final GitSource source; @@ -221,27 +226,31 @@ class BoundGitSource extends CachedSource { @override Future> doGetVersions(PackageRef ref) async { - await _ensureRepoCache(ref); - var path = _repoCachePath(ref); - var revision = await _firstRevision(path, ref.description['ref']); - var pubspec = - await _describeUncached(ref, revision, ref.description['path']); - - return [ - PackageId(ref.name, source, pubspec.version, { - 'url': ref.description['url'], - 'ref': ref.description['ref'], - 'resolved-ref': revision, - 'path': ref.description['path'] - }) - ]; + return await _pool.withResource(() async { + await _ensureRepoCache(ref); + var path = _repoCachePath(ref); + var revision = await _firstRevision(path, ref.description['ref']); + var pubspec = + await _describeUncached(ref, revision, ref.description['path']); + + return [ + PackageId(ref.name, source, pubspec.version, { + 'url': ref.description['url'], + 'ref': ref.description['ref'], + 'resolved-ref': revision, + 'path': ref.description['path'] + }) + ]; + }); } /// Since we don't have an easy way to read from a remote Git repo, this /// just installs [id] into the system cache, then describes it from there. @override - Future describeUncached(PackageId id) => _describeUncached( - id.toRef(), id.description['resolved-ref'], id.description['path']); + Future describeUncached(PackageId id) { + return _pool.withResource(() => _describeUncached( + id.toRef(), id.description['resolved-ref'], id.description['path'])); + } /// Like [describeUncached], but takes a separate [ref] and Git [revision] /// rather than a single ID. @@ -284,28 +293,32 @@ class BoundGitSource extends CachedSource { /// in `cache/`. @override Future downloadToSystemCache(PackageId id) async { - var ref = id.toRef(); - if (!git.isInstalled) { - fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n" - 'Please ensure Git is correctly installed.'); - } - - ensureDir(p.join(systemCacheRoot, 'cache')); - await _ensureRevision(ref, id.description['resolved-ref']); - - var revisionCachePath = _revisionCachePath(id); - await _revisionCacheClones.putIfAbsent(revisionCachePath, () async { - if (!entryExists(revisionCachePath)) { - await _clone(_repoCachePath(ref), revisionCachePath); - await _checkOut(revisionCachePath, id.description['resolved-ref']); - _writePackageList(revisionCachePath, [id.description['path']]); - } else { - _updatePackageList(revisionCachePath, id.description['path']); + return await _pool.withResource(() async { + var ref = id.toRef(); + if (!git.isInstalled) { + fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n" + 'Please ensure Git is correctly installed.'); } - }); - return Package.load(id.name, - p.join(revisionCachePath, id.description['path']), systemCache.sources); + ensureDir(p.join(systemCacheRoot, 'cache')); + await _ensureRevision(ref, id.description['resolved-ref']); + + var revisionCachePath = _revisionCachePath(id); + await _revisionCacheClones.putIfAbsent(revisionCachePath, () async { + if (!entryExists(revisionCachePath)) { + await _clone(_repoCachePath(ref), revisionCachePath); + await _checkOut(revisionCachePath, id.description['resolved-ref']); + _writePackageList(revisionCachePath, [id.description['path']]); + } else { + _updatePackageList(revisionCachePath, id.description['path']); + } + }); + + return Package.load( + id.name, + p.join(revisionCachePath, id.description['path']), + systemCache.sources); + }); } /// Returns the path to the revision-specific cache of [id]. diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart index e89244ded..b0eb73037 100644 --- a/lib/src/source/hosted.dart +++ b/lib/src/source/hosted.dart @@ -6,8 +6,11 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io' as io; +import 'package:collection/collection.dart' show maxBy; import 'package:http/http.dart' as http; import 'package:path/path.dart' as p; +import 'package:pedantic/pedantic.dart'; +import 'package:pub/src/rate_limited_scheduler.dart'; import 'package:pub_semver/pub_semver.dart'; import 'package:stack_trace/stack_trace.dart'; @@ -154,36 +157,66 @@ class BoundHostedSource extends CachedSource { @override final SystemCache systemCache; + RateLimitedScheduler> _scheduler; - BoundHostedSource(this.source, this.systemCache); + BoundHostedSource(this.source, this.systemCache) { + _scheduler = + RateLimitedScheduler(_fetchVersions, maxConcurrentOperations: 10); + } - /// Downloads a list of all versions of a package that are available from the - /// site. - @override - Future> doGetVersions(PackageRef ref) async { + Future> _fetchVersions(PackageRef ref) async { var url = _makeUrl( ref.description, (server, package) => '$server/api/packages/$package'); - log.io('Get versions from $url.'); String body; try { + // TODO(sigurdm): Implement cancellation of requests. This probably + // requires resolution of: https://github.com/dart-lang/sdk/issues/22265. body = await httpClient.read(url, headers: pubApiHeaders); } catch (error, stackTrace) { var parsed = source._parseDescription(ref.description); _throwFriendlyError(error, stackTrace, parsed.first, parsed.last); } - - var doc = jsonDecode(body); - return (doc['versions'] as List).map((map) { + final doc = jsonDecode(body); + final versions = doc['versions'] as List; + final result = Map.fromEntries(versions.map((map) { var pubspec = Pubspec.fromMap(map['pubspec'], systemCache.sources, expectedName: ref.name, location: url); var id = source.idFor(ref.name, pubspec.version, url: _serverFor(ref.description)); - memoizePubspec(id, pubspec); + return MapEntry(id, pubspec); + })); + + // Prefetch the dependencies of the latest version, we are likely to need + // them later. + final preschedule = + Zone.current[_prefetchingKey] as void Function(PackageRef); + if (preschedule != null) { + final latestVersion = + maxBy(result.keys.map((id) => id.version), (e) => e); + + final latestVersionId = + PackageId(ref.name, source, latestVersion, ref.description); + + final dependencies = result[latestVersionId]?.dependencies?.values ?? []; + unawaited(withDependencyType(DependencyType.none, () async { + for (final packageRange in dependencies) { + if (packageRange.source is HostedSource) { + preschedule(packageRange.toRef()); + } + } + })); + } + return result; + } - return id; - }).toList(); + /// Downloads a list of all versions of a package that are available from the + /// site. + @override + Future> doGetVersions(PackageRef ref) async { + final versions = await _scheduler.schedule(ref); + return versions.keys.toList(); } /// Parses [description] into its server and package name components, then @@ -198,27 +231,15 @@ class BoundHostedSource extends CachedSource { return Uri.parse(pattern(server, package)); } - /// Downloads and parses the pubspec for a specific version of a package that - /// is available from the site. + /// Retrieves the pubspec for a specific version of a package that is + /// available from the site. @override Future describeUncached(PackageId id) async { - // Request it from the server. - var url = _makeVersionUrl( - id, - (server, package, version) => - '$server/api/packages/$package/versions/$version'); - - log.io('Describe package at $url.'); - Map version; - try { - version = jsonDecode(await httpClient.read(url, headers: pubApiHeaders)); - } catch (error, stackTrace) { - var parsed = source._parseDescription(id.description); - _throwFriendlyError(error, stackTrace, id.name, parsed.last); - } - - return Pubspec.fromMap(version['pubspec'], systemCache.sources, - expectedName: id.name, location: url); + final versions = await _scheduler.schedule(id.toRef()); + final url = _makeUrl( + id.description, (server, package) => '$server/api/packages/$package'); + return versions[id] ?? + (throw PackageNotFoundException('Could not find package $id at $url')); } /// Downloads the package identified by [id] to the system cache. @@ -441,18 +462,17 @@ class BoundHostedSource extends CachedSource { Uri _serverFor(description) => Uri.parse(source._parseDescription(description).last); - /// Parses [id] into its server, package name, and version components, then - /// converts that to a Uri given [pattern]. - /// - /// Ensures the package name is properly URL encoded. - Uri _makeVersionUrl(PackageId id, - String Function(String server, String package, String version) pattern) { - var parsed = source._parseDescription(id.description); - var server = parsed.last; - var package = Uri.encodeComponent(parsed.first); - var version = Uri.encodeComponent(id.version.toString()); - return Uri.parse(pattern(server, package, version)); + /// Enables speculative prefetching of dependencies of packages queried with + /// [getVersions]. + Future withPrefetching(Future Function() callback) async { + return await _scheduler.withPrescheduling((preschedule) async { + return await runZoned(callback, + zoneValues: {_prefetchingKey: preschedule}); + }); } + + /// Key for storing the current prefetch function in the current [Zone]. + static const _prefetchingKey = #_prefetch; } /// This is the modified hosted source used when pub get or upgrade are run diff --git a/lib/src/utils.dart b/lib/src/utils.dart index 14012820d..89687d897 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -295,20 +295,27 @@ T maxAll(Iterable iter, [int Function(T, T) compare]) { .reduce((max, element) => compare(element, max) > 0 ? element : max); } -/// Like [minBy], but with an asynchronous [orderBy] callback. +/// Returns the element of [values] for which [orderBy] returns the smallest +/// value. +/// +/// Returns the first such value in case of ties. +/// +/// Starts all the [orderBy] invocations in parallel. Future minByAsync( Iterable values, Future Function(S) orderBy) async { - S minValue; + int minIndex; T minOrderBy; - for (var element in values) { - var elementOrderBy = await orderBy(element); + List valuesList = values.toList(); + final orderByResults = await Future.wait(values.map(orderBy)); + for (var i = 0; i < orderByResults.length; i++) { + final elementOrderBy = orderByResults[i]; if (minOrderBy == null || (elementOrderBy as Comparable).compareTo(minOrderBy) < 0) { - minValue = element; + minIndex = i; minOrderBy = elementOrderBy; } } - return minValue; + return valuesList[minIndex]; } /// Like [List.sublist], but for any iterable. diff --git a/pubspec.yaml b/pubspec.yaml index d524cbef4..2d76599b1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -20,6 +20,7 @@ dependencies: oauth2: ^1.0.0 package_config: ^1.0.0 path: ^1.2.0 + pedantic: ^1.9.0 pool: ^1.0.0 pub_semver: ^1.4.0 shelf: ^0.7.0 diff --git a/test/rate_limited_scheduler_test.dart b/test/rate_limited_scheduler_test.dart new file mode 100644 index 000000000..346c21b0d --- /dev/null +++ b/test/rate_limited_scheduler_test.dart @@ -0,0 +1,213 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:test/test.dart'; +import 'package:pedantic/pedantic.dart'; +import 'package:pub/src/rate_limited_scheduler.dart'; + +void main() { + Map threeCompleters() => + {'a': Completer(), 'b': Completer(), 'c': Completer()}; + + test('scheduler is rate limited', () async { + final completers = threeCompleters(); + final isBeingProcessed = threeCompleters(); + + Future f(String i) async { + isBeingProcessed[i].complete(); + await completers[i].future; + return i.toUpperCase(); + } + + final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + await scheduler.withPrescheduling((preschedule) async { + preschedule('a'); + preschedule('b'); + preschedule('c'); + await Future.wait( + [isBeingProcessed['a'].future, isBeingProcessed['b'].future]); + expect(isBeingProcessed['c'].isCompleted, isFalse); + completers['a'].complete(); + await isBeingProcessed['c'].future; + completers['c'].complete(); + expect(await scheduler.schedule('c'), 'C'); + }); + }); + + test('scheduler.preschedule cancels unrun prescheduled task after callback', + () async { + final completers = threeCompleters(); + final isBeingProcessed = threeCompleters(); + + Future f(String i) async { + isBeingProcessed[i].complete(); + await completers[i].future; + return i.toUpperCase(); + } + + final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1); + + await scheduler.withPrescheduling((preschedule1) async { + await scheduler.withPrescheduling((preschedule2) async { + preschedule1('a'); + preschedule2('b'); + preschedule1('c'); + await isBeingProcessed['a'].future; + // b, c should not start processing due to rate-limiting. + expect(isBeingProcessed['b'].isCompleted, isFalse); + expect(isBeingProcessed['c'].isCompleted, isFalse); + }); + completers['a'].complete(); + // b is removed from the queue, now c should start processing. + await isBeingProcessed['c'].future; + completers['c'].complete(); + expect(await scheduler.schedule('c'), 'C'); + // b is not on the queue anymore. + expect(isBeingProcessed['b'].isCompleted, isFalse); + }); + }); + + test('scheduler.preschedule does not cancel tasks that are scheduled', + () async { + final completers = threeCompleters(); + final isBeingProcessed = threeCompleters(); + + Future f(String i) async { + isBeingProcessed[i].complete(); + await completers[i].future; + return i.toUpperCase(); + } + + final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1); + + Future b; + await scheduler.withPrescheduling((preschedule) async { + preschedule('a'); + preschedule('b'); + await isBeingProcessed['a'].future; + // b should not start processing due to rate-limiting. + expect(isBeingProcessed['b'].isCompleted, isFalse); + b = scheduler.schedule('b'); + }); + completers['a'].complete(); + expect(await scheduler.schedule('a'), 'A'); + // b was scheduled, so it should get processed now + await isBeingProcessed['b'].future; + completers['b'].complete(); + expect(await b, 'B'); + }); + + test('scheduler caches results', () async { + final completers = threeCompleters(); + final isBeingProcessed = threeCompleters(); + + Future f(String i) async { + isBeingProcessed[i].complete(); + await completers[i].future; + return i.toUpperCase(); + } + + final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + + completers['a'].complete(); + expect(await scheduler.schedule('a'), 'A'); + // Would fail if isBeingProcessed['a'] was completed twice + expect(await scheduler.schedule('a'), 'A'); + }); + + test('scheduler prioritizes fetched tasks before prefetched', () async { + final completers = threeCompleters(); + final isBeingProcessed = threeCompleters(); + + Future f(String i) async { + isBeingProcessed[i].complete(); + await completers[i].future; + return i.toUpperCase(); + } + + final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1); + await scheduler.withPrescheduling((preschedule) async { + preschedule('a'); + preschedule('b'); + await isBeingProcessed['a'].future; + final cResult = scheduler.schedule('c'); + expect(isBeingProcessed['b'].isCompleted, isFalse); + completers['a'].complete(); + completers['c'].complete(); + await isBeingProcessed['c'].future; + // 'c' is done before we allow 'b' to finish processing + expect(await cResult, 'C'); + }); + }); + + test('Errors trigger when the scheduled future is listened to', () async { + final completers = threeCompleters(); + final isBeingProcessed = threeCompleters(); + + Future f(String i) async { + isBeingProcessed[i].complete(); + await completers[i].future; + return i.toUpperCase(); + } + + final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + + await scheduler.withPrescheduling((preschedule) async { + preschedule('a'); + preschedule('b'); + preschedule('c'); + await isBeingProcessed['a'].future; + await isBeingProcessed['b'].future; + expect(isBeingProcessed['c'].isCompleted, isFalse); + unawaited(completers['c'].future.catchError((_) {})); + completers['c'].completeError('errorC'); + completers['a'].completeError('errorA'); + await isBeingProcessed['c'].future; + completers['b'].completeError('errorB'); + expect(() async => await scheduler.schedule('a'), throwsA('errorA')); + expect(() async => await scheduler.schedule('b'), throwsA('errorB')); + expect(() async => await scheduler.schedule('c'), throwsA('errorC')); + }); + }); + + test('tasks run in the zone they where enqueued in', () async { + final completers = threeCompleters(); + final isBeingProcessed = threeCompleters(); + + Future f(String i) async { + isBeingProcessed[i].complete(); + await completers[i].future; + return Zone.current['zoneValue']; + } + + final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2); + await scheduler.withPrescheduling((preschedule) async { + runZoned(() { + preschedule('a'); + }, zoneValues: {'zoneValue': 'A'}); + runZoned(() { + preschedule('b'); + }, zoneValues: {'zoneValue': 'B'}); + runZoned(() { + preschedule('c'); + }, zoneValues: {'zoneValue': 'C'}); + + await runZoned(() async { + await isBeingProcessed['a'].future; + await isBeingProcessed['b'].future; + // This will put 'c' in front of the queue, but in a zone with zoneValue + // bound to S. + final f = expectLater(scheduler.schedule('c'), completion('S')); + completers['a'].complete(); + completers['b'].complete(); + expect(await scheduler.schedule('a'), 'A'); + expect(await scheduler.schedule('b'), 'B'); + completers['c'].complete(); + await f; + }, zoneValues: {'zoneValue': 'S'}); + }); + }); +} diff --git a/test/utils_test.dart b/test/utils_test.dart index e7de8f05d..c0d60679d 100644 --- a/test/utils_test.dart +++ b/test/utils_test.dart @@ -2,6 +2,8 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; + import 'package:pub/src/utils.dart'; import 'package:test/test.dart'; @@ -122,4 +124,43 @@ b: {}''')); } }); }); + + group('minByAsync', () { + test('is stable', () async { + { + final completers = {}; + Completer completer(k) => completers.putIfAbsent(k, () => Completer()); + Future lengthWhenComplete(String s) async { + await completer(s).future; + return s.length; + } + + final w = expectLater( + minByAsync(['aa', 'a', 'b', 'ccc'], lengthWhenComplete), + completion('a')); + completer('aa').complete(); + completer('b').complete(); + completer('a').complete(); + completer('ccc').complete(); + await w; + } + { + final completers = {}; + Completer completer(k) => completers.putIfAbsent(k, () => Completer()); + Future lengthWhenComplete(String s) async { + await completer(s).future; + return s.length; + } + + final w = expectLater( + minByAsync(['aa', 'a', 'b', 'ccc'], lengthWhenComplete), + completion('a')); + completer('ccc').complete(); + completer('a').complete(); + completer('b').complete(); + completer('aa').complete(); + await w; + } + }); + }); }