diff --git a/lib/src/io_utils.dart b/lib/src/io_utils.dart index ad05e4caee..2be1ffa625 100644 --- a/lib/src/io_utils.dart +++ b/lib/src/io_utils.dart @@ -5,6 +5,8 @@ /// This is a helper library to make working with io easier. library dartdoc.io_utils; +import 'dart:async'; +import 'dart:collection'; import 'dart:convert'; import 'dart:io' as io; @@ -118,40 +120,91 @@ final RegExp partOfRegexp = RegExp('part of '); 'as Dartdoc 1.0.0') final RegExp newLinePartOfRegexp = RegExp('\npart of '); -/// Best used with Future. -class MultiFutureTracker { - /// Approximate maximum number of simultaneous active Futures. - final int parallel; +typedef TaskQueueClosure = Future Function(); - final Set> _trackedFutures = {}; +class _TaskQueueItem { + _TaskQueueItem(this._closure, this._completer, {this.onComplete}); - MultiFutureTracker(this.parallel); + final TaskQueueClosure _closure; + final Completer _completer; + void Function() onComplete; - /// Wait until fewer or equal to this many Futures are outstanding. - Future _waitUntil(int max) async { - while (_trackedFutures.length > max) { - await Future.any(_trackedFutures); + Future run() async { + try { + _completer.complete(await _closure()); + } catch (e) { + _completer.completeError(e); + } finally { + onComplete?.call(); } } +} + +/// A task queue of Futures to be completed in parallel, throttling +/// the number of simultaneous tasks. +/// +/// The tasks return results of type T. +class TaskQueue { + /// Creates a task queue with a maximum number of simultaneous jobs. + /// The [maxJobs] parameter defaults to the number of CPU cores on the + /// system. + TaskQueue({int maxJobs}) + : maxJobs = maxJobs ?? io.Platform.numberOfProcessors; + + /// The maximum number of jobs that this queue will run simultaneously. + final int maxJobs; + + final Queue<_TaskQueueItem> _pendingTasks = Queue<_TaskQueueItem>(); + final Set<_TaskQueueItem> _activeTasks = <_TaskQueueItem>{}; + final Set> _completeListeners = >{}; + + /// Returns a future that completes when all tasks in the [TaskQueue] are + /// complete. + Future get tasksComplete { + // In case this is called when there are no tasks, we want it to + // signal complete immediately. + if (_activeTasks.isEmpty && _pendingTasks.isEmpty) { + return Future.value(); + } + final completer = Completer(); + _completeListeners.add(completer); + return completer.future; + } + + /// Adds a single closure to the task queue, returning a future that + /// completes when the task completes. + Future add(TaskQueueClosure task) { + final completer = Completer(); + _pendingTasks.add(_TaskQueueItem(task, completer)); + if (_activeTasks.length < maxJobs) { + _processTask(); + } + return completer.future; + } - /// Generates a [Future] from the given closure and adds it to the queue, - /// once the queue is sufficiently empty. The returned future completes - /// when the generated [Future] has been added to the queue. - /// - /// If the closure does not handle its own exceptions, other calls to - /// [addFutureFromClosure] or [wait] may trigger an exception. - Future addFutureFromClosure(Future Function() closure) async { - await _waitUntil(parallel - 1); - Future future = closure(); - _trackedFutures.add(future); - // ignore: unawaited_futures - future.then((f) { - _trackedFutures.remove(future); - }, onError: (s, e) { - _trackedFutures.remove(future); - }); + // Process a single task. + void _processTask() { + if (_pendingTasks.isNotEmpty && _activeTasks.length <= maxJobs) { + final item = _pendingTasks.removeFirst(); + _activeTasks.add(item); + item.onComplete = () { + _activeTasks.remove(item); + _processTask(); + }; + item.run(); + } else { + _checkForCompletion(); + } } - /// Wait until all futures added so far have completed. - Future wait() async => await _waitUntil(0); + void _checkForCompletion() { + if (_activeTasks.isEmpty && _pendingTasks.isEmpty) { + for (final completer in _completeListeners) { + if (!completer.isCompleted) { + completer.complete(); + } + } + _completeListeners.clear(); + } + } } diff --git a/lib/src/model/package_graph.dart b/lib/src/model/package_graph.dart index d088ec260f..7ac8c07122 100644 --- a/lib/src/model/package_graph.dart +++ b/lib/src/model/package_graph.dart @@ -86,7 +86,7 @@ class PackageGraph with CommentReferable, Nameable { // specialClasses handler so when we find them, they get added. specialClasses = SpecialClasses(); // Go through docs of every ModelElement in package to pre-build the macros - // index. Uses toList() in order to get all the precaching on the stack. + // index. await Future.wait(precacheLocalDocs()); _localDocumentationBuilt = true; @@ -153,6 +153,8 @@ class PackageGraph with CommentReferable, Nameable { } yield* precacheOneElement(m); } + // Now wait for any of the tasks still running to complete. + yield config.tools.runner.wait(); } // Many ModelElements have the same ModelNode; don't build/cache this data more diff --git a/lib/src/tool_runner.dart b/lib/src/tool_runner.dart index 23e8737fc8..f45e85d925 100644 --- a/lib/src/tool_runner.dart +++ b/lib/src/tool_runner.dart @@ -17,10 +17,6 @@ typedef ToolErrorCallback = void Function(String message); typedef FakeResultCallback = String Function(String tool, {List args, String content}); -/// Set a ceiling on how many tool instances can be in progress at once, -/// limiting both parallelization and the number of open temporary files. -final MultiFutureTracker _toolTracker = MultiFutureTracker(4); - class ToolTempFileTracker { final ResourceProvider resourceProvider; final Folder temporaryDirectory; @@ -66,6 +62,12 @@ class ToolRunner { /// generated by the tool. ToolRunner(this.toolConfiguration); + /// Set a ceiling on how many tool instances can be in progress at once, + /// limiting both parallelization and the number of open temporary files. + static final TaskQueue _toolTracker = TaskQueue(); + + Future wait() => _toolTracker.tasksComplete; + final ToolConfiguration toolConfiguration; Future _runSetup( @@ -131,16 +133,12 @@ class ToolRunner { Map environment}) async { assert(args != null); assert(args.isNotEmpty); - Future runner; - // Prevent too many tools from running simultaneously. - await _toolTracker.addFutureFromClosure(() { - runner = _run(args, + return _toolTracker.add(() { + return _run(args, toolErrorCallback: toolErrorCallback, content: content, environment: environment); - return runner; }); - return runner; } Future _run(List args, diff --git a/test/io_utils_test.dart b/test/io_utils_test.dart index d22c95d9bf..ee24db4243 100644 --- a/test/io_utils_test.dart +++ b/test/io_utils_test.dart @@ -18,64 +18,60 @@ void main() { }); }); - group('MultiFutureTracker', () { - /// A special test designed to check shared [MultiFutureTracker] + group('TaskQueue', () { + /// A special test designed to check shared [TaskQueue] /// behavior when exceptions occur after a delay in the passed closures to - /// [MultiFutureTracker.addFutureFromClosure]. + /// [TaskQueue.add]. test('no deadlock when delayed exceptions fire in closures', () async { - var sharedTracker = MultiFutureTracker(2); + var sharedTracker = TaskQueue(maxJobs: 2); expect(() async { var t = Future.delayed(Duration(milliseconds: 10), () => throw Exception()); - await sharedTracker.addFutureFromClosure(() => t); + await sharedTracker.add(() => t); return t; }, throwsA(const TypeMatcher())); expect(() async { var t = Future.delayed(Duration(milliseconds: 10), () => throw Exception()); - await sharedTracker.addFutureFromClosure(() => t); + await sharedTracker.add(() => t); return t; }, throwsA(const TypeMatcher())); expect(() async { var t = Future.delayed(Duration(milliseconds: 10), () => throw Exception()); // ignore: empty_catches - await sharedTracker.addFutureFromClosure(() => t); + await sharedTracker.add(() => t); return t; }, throwsA(const TypeMatcher())); expect(() async { var t = Future.delayed(Duration(milliseconds: 10), () => throw Exception()); - await sharedTracker.addFutureFromClosure(() => t); + await sharedTracker.add(() => t); return t; }, throwsA(const TypeMatcher())); /// We deadlock here if the exception is not handled properly. - await sharedTracker.wait(); + await sharedTracker.tasksComplete; }); test('basic sequential processing works with no deadlock', () async { var completed = {}; - var tracker = MultiFutureTracker(1); - await tracker.addFutureFromClosure(() async => completed.add(1)); - await tracker.addFutureFromClosure(() async => completed.add(2)); - await tracker.addFutureFromClosure(() async => completed.add(3)); - await tracker.wait(); + var tracker = TaskQueue(maxJobs: 1); + await tracker.add(() async => completed.add(1)); + await tracker.add(() async => completed.add(2)); + await tracker.add(() async => completed.add(3)); + await tracker.tasksComplete; expect(completed.length, equals(3)); }); test('basic sequential processing works on exceptions', () async { var completed = {}; - var tracker = MultiFutureTracker(1); - await tracker.addFutureFromClosure(() async => completed.add(0)); - await tracker - .addFutureFromClosure(() async => throw Exception()) - .catchError((e) {}); - await tracker - .addFutureFromClosure(() async => throw Exception()) - .catchError((e) {}); - await tracker.addFutureFromClosure(() async => completed.add(3)); - await tracker.wait(); + var tracker = TaskQueue(maxJobs: 1); + await tracker.add(() async => completed.add(0)); + await tracker.add(() async => throw Exception()).catchError((e) {}); + await tracker.add(() async => throw Exception()).catchError((e) {}); + await tracker.add(() async => completed.add(3)); + await tracker.tasksComplete; expect(completed.length, equals(2)); }); @@ -83,28 +79,34 @@ void main() { /// of in-flight [Future]s that there is no deadlock. test('basic parallel processing works with no deadlock', () async { var completed = {}; - var tracker = MultiFutureTracker(10); + var tracker = TaskQueue(maxJobs: 10); for (var i = 0; i < 100; i++) { - await tracker.addFutureFromClosure(() async => completed.add(i)); + await tracker.add(() async => completed.add(i)); } - await tracker.wait(); + await tracker.tasksComplete; expect(completed.length, equals(100)); }); test('basic parallel processing works on exceptions', () async { var completed = {}; - var tracker = MultiFutureTracker(10); + var tracker = TaskQueue(maxJobs: 10); for (var i = 0; i < 50; i++) { - await tracker.addFutureFromClosure(() async => completed.add(i)); + await tracker.add(() async => completed.add(i)); } for (var i = 50; i < 65; i++) { - await tracker.addFutureFromClosure(() async => throw Exception()); + try { + await tracker.add(() async => throw TestException()); + } on TestException { + // Ignore + } } for (var i = 65; i < 100; i++) { - await tracker.addFutureFromClosure(() async => completed.add(i)); + await tracker.add(() async => completed.add(i)); } - await tracker.wait(); + await tracker.tasksComplete; expect(completed.length, equals(85)); }); }); } + +class TestException implements Exception {} diff --git a/tool/grind.dart b/tool/grind.dart index 5a85091b42..a6607a0966 100644 --- a/tool/grind.dart +++ b/tool/grind.dart @@ -49,8 +49,8 @@ final String defaultPubCache = Platform.environment['PUB_CACHE'] ?? path.context.resolveTildePath('~/.pub-cache'); /// Run no more than the number of processors available in parallel. -final MultiFutureTracker testFutures = MultiFutureTracker( - int.tryParse(Platform.environment['MAX_TEST_FUTURES'] ?? '') ?? +final TaskQueue testFutures = TaskQueue( + maxJobs: int.tryParse(Platform.environment['MAX_TEST_FUTURES'] ?? '') ?? Platform.numberOfProcessors); // Directory.systemTemp is not a constant. So wrap it. @@ -1114,21 +1114,21 @@ Future tryPublish() async { @Depends(clean) Future smokeTest() async { await testDart2(smokeTestFiles); - await testFutures.wait(); + await testFutures.tasksComplete; } @Task('Run non-smoke tests, only') @Depends(clean) Future longTest() async { await testDart2(testFiles); - await testFutures.wait(); + await testFutures.tasksComplete; } @Task('Run all the tests.') @Depends(clean) Future test() async { await testDart2(smokeTestFiles.followedBy(testFiles)); - await testFutures.wait(); + await testFutures.tasksComplete; } @Task('Clean up pub data from test directories') @@ -1166,7 +1166,7 @@ Future testDart2(Iterable tests) async { var parameters = ['--enable-asserts']; for (var dartFile in tests) { - await testFutures.addFutureFromClosure(() => + await testFutures.add(() => CoverageSubprocessLauncher('dart2-${path.basename(dartFile.path)}') .runStreamed(Platform.resolvedExecutable, [...parameters, dartFile.path]));