Skip to content

Fix concurrency issues in tool execution #2730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 81 additions & 28 deletions lib/src/io_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
/// This is a helper library to make working with io easier.
library dartdoc.io_utils;

import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io' as io;

Expand Down Expand Up @@ -118,40 +120,91 @@ final RegExp partOfRegexp = RegExp('part of ');
'as Dartdoc 1.0.0')
final RegExp newLinePartOfRegexp = RegExp('\npart of ');

/// Best used with Future<void>.
class MultiFutureTracker<T> {
/// Approximate maximum number of simultaneous active Futures.
final int parallel;
typedef TaskQueueClosure<T> = Future<T> Function();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this makes things so much easier to read.


final Set<Future<T>> _trackedFutures = {};
class _TaskQueueItem<T> {
_TaskQueueItem(this._closure, this._completer, {this.onComplete});

MultiFutureTracker(this.parallel);
final TaskQueueClosure<T> _closure;
final Completer<T> _completer;
void Function() onComplete;

/// Wait until fewer or equal to this many Futures are outstanding.
Future<void> _waitUntil(int max) async {
while (_trackedFutures.length > max) {
await Future.any(_trackedFutures);
Future<void> run() async {
try {
_completer.complete(await _closure());
} catch (e) {
_completer.completeError(e);
} finally {
onComplete?.call();
}
}
}

/// A task queue of Futures to be completed in parallel, throttling
/// the number of simultaneous tasks.
///
/// The tasks return results of type T.
class TaskQueue<T> {
/// Creates a task queue with a maximum number of simultaneous jobs.
/// The [maxJobs] parameter defaults to the number of CPU cores on the
/// system.
TaskQueue({int maxJobs})
: maxJobs = maxJobs ?? io.Platform.numberOfProcessors;

/// The maximum number of jobs that this queue will run simultaneously.
final int maxJobs;

final Queue<_TaskQueueItem<T>> _pendingTasks = Queue<_TaskQueueItem<T>>();
final Set<_TaskQueueItem<T>> _activeTasks = <_TaskQueueItem<T>>{};
final Set<Completer<void>> _completeListeners = <Completer<void>>{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this structure is more easily verified than the queue-length original version for waiting on completion. nice.


/// Returns a future that completes when all tasks in the [TaskQueue] are
/// complete.
Future<void> get tasksComplete {
// In case this is called when there are no tasks, we want it to
// signal complete immediately.
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) {
return Future<void>.value();
}
final completer = Completer<void>();
_completeListeners.add(completer);
return completer.future;
}

/// Adds a single closure to the task queue, returning a future that
/// completes when the task completes.
Future<T> add(TaskQueueClosure<T> task) {
final completer = Completer<T>();
_pendingTasks.add(_TaskQueueItem<T>(task, completer));
if (_activeTasks.length < maxJobs) {
_processTask();
}
return completer.future;
}

/// Generates a [Future] from the given closure and adds it to the queue,
/// once the queue is sufficiently empty. The returned future completes
/// when the generated [Future] has been added to the queue.
///
/// If the closure does not handle its own exceptions, other calls to
/// [addFutureFromClosure] or [wait] may trigger an exception.
Future<void> addFutureFromClosure(Future<T> Function() closure) async {
await _waitUntil(parallel - 1);
Future<void> future = closure();
_trackedFutures.add(future);
// ignore: unawaited_futures
future.then((f) {
_trackedFutures.remove(future);
}, onError: (s, e) {
_trackedFutures.remove(future);
});
// Process a single task.
void _processTask() {
if (_pendingTasks.isNotEmpty && _activeTasks.length <= maxJobs) {
final item = _pendingTasks.removeFirst();
_activeTasks.add(item);
item.onComplete = () {
_activeTasks.remove(item);
_processTask();
};
item.run();
} else {
_checkForCompletion();
}
}

/// Wait until all futures added so far have completed.
Future<void> wait() async => await _waitUntil(0);
void _checkForCompletion() {
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) {
for (final completer in _completeListeners) {
if (!completer.isCompleted) {
completer.complete();
}
}
_completeListeners.clear();
}
}
}
4 changes: 3 additions & 1 deletion lib/src/model/package_graph.dart
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class PackageGraph with CommentReferable, Nameable {
// specialClasses handler so when we find them, they get added.
specialClasses = SpecialClasses();
// Go through docs of every ModelElement in package to pre-build the macros
// index. Uses toList() in order to get all the precaching on the stack.
// index.
await Future.wait(precacheLocalDocs());
_localDocumentationBuilt = true;

Expand Down Expand Up @@ -153,6 +153,8 @@ class PackageGraph with CommentReferable, Nameable {
}
yield* precacheOneElement(m);
}
// Now wait for any of the tasks still running to complete.
yield config.tools.runner.wait();
}

// Many ModelElements have the same ModelNode; don't build/cache this data more
Expand Down
18 changes: 8 additions & 10 deletions lib/src/tool_runner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ typedef ToolErrorCallback = void Function(String message);
typedef FakeResultCallback = String Function(String tool,
{List<String> args, String content});

/// Set a ceiling on how many tool instances can be in progress at once,
/// limiting both parallelization and the number of open temporary files.
final MultiFutureTracker<void> _toolTracker = MultiFutureTracker(4);

class ToolTempFileTracker {
final ResourceProvider resourceProvider;
final Folder temporaryDirectory;
Expand Down Expand Up @@ -66,6 +62,12 @@ class ToolRunner {
/// generated by the tool.
ToolRunner(this.toolConfiguration);

/// Set a ceiling on how many tool instances can be in progress at once,
/// limiting both parallelization and the number of open temporary files.
static final TaskQueue<String> _toolTracker = TaskQueue<String>();

Future<void> wait() => _toolTracker.tasksComplete;

final ToolConfiguration toolConfiguration;

Future<void> _runSetup(
Expand Down Expand Up @@ -131,16 +133,12 @@ class ToolRunner {
Map<String, String> environment}) async {
assert(args != null);
assert(args.isNotEmpty);
Future<String> runner;
// Prevent too many tools from running simultaneously.
await _toolTracker.addFutureFromClosure(() {
runner = _run(args,
return _toolTracker.add(() {
return _run(args,
toolErrorCallback: toolErrorCallback,
content: content,
environment: environment);
return runner;
});
return runner;
}

Future<String> _run(List<String> args,
Expand Down
66 changes: 34 additions & 32 deletions test/io_utils_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,93 +18,95 @@ void main() {
});
});

group('MultiFutureTracker', () {
/// A special test designed to check shared [MultiFutureTracker]
group('TaskQueue', () {
/// A special test designed to check shared [TaskQueue]
/// behavior when exceptions occur after a delay in the passed closures to
/// [MultiFutureTracker.addFutureFromClosure].
/// [TaskQueue.add].
test('no deadlock when delayed exceptions fire in closures', () async {
var sharedTracker = MultiFutureTracker(2);
var sharedTracker = TaskQueue(maxJobs: 2);
expect(() async {
var t =
Future.delayed(Duration(milliseconds: 10), () => throw Exception());
await sharedTracker.addFutureFromClosure(() => t);
await sharedTracker.add(() => t);
return t;
}, throwsA(const TypeMatcher<Exception>()));
expect(() async {
var t =
Future.delayed(Duration(milliseconds: 10), () => throw Exception());
await sharedTracker.addFutureFromClosure(() => t);
await sharedTracker.add(() => t);
return t;
}, throwsA(const TypeMatcher<Exception>()));
expect(() async {
var t =
Future.delayed(Duration(milliseconds: 10), () => throw Exception());
// ignore: empty_catches
await sharedTracker.addFutureFromClosure(() => t);
await sharedTracker.add(() => t);
return t;
}, throwsA(const TypeMatcher<Exception>()));
expect(() async {
var t =
Future.delayed(Duration(milliseconds: 10), () => throw Exception());
await sharedTracker.addFutureFromClosure(() => t);
await sharedTracker.add(() => t);
return t;
}, throwsA(const TypeMatcher<Exception>()));

/// We deadlock here if the exception is not handled properly.
await sharedTracker.wait();
await sharedTracker.tasksComplete;
});

test('basic sequential processing works with no deadlock', () async {
var completed = <int>{};
var tracker = MultiFutureTracker(1);
await tracker.addFutureFromClosure(() async => completed.add(1));
await tracker.addFutureFromClosure(() async => completed.add(2));
await tracker.addFutureFromClosure(() async => completed.add(3));
await tracker.wait();
var tracker = TaskQueue(maxJobs: 1);
await tracker.add(() async => completed.add(1));
await tracker.add(() async => completed.add(2));
await tracker.add(() async => completed.add(3));
await tracker.tasksComplete;
expect(completed.length, equals(3));
});

test('basic sequential processing works on exceptions', () async {
var completed = <int>{};
var tracker = MultiFutureTracker(1);
await tracker.addFutureFromClosure(() async => completed.add(0));
await tracker
.addFutureFromClosure(() async => throw Exception())
.catchError((e) {});
await tracker
.addFutureFromClosure(() async => throw Exception())
.catchError((e) {});
await tracker.addFutureFromClosure(() async => completed.add(3));
await tracker.wait();
var tracker = TaskQueue(maxJobs: 1);
await tracker.add(() async => completed.add(0));
await tracker.add(() async => throw Exception()).catchError((e) {});
await tracker.add(() async => throw Exception()).catchError((e) {});
await tracker.add(() async => completed.add(3));
await tracker.tasksComplete;
expect(completed.length, equals(2));
});

/// Verify that if there are more exceptions than the maximum number
/// of in-flight [Future]s that there is no deadlock.
test('basic parallel processing works with no deadlock', () async {
var completed = <int>{};
var tracker = MultiFutureTracker(10);
var tracker = TaskQueue(maxJobs: 10);
for (var i = 0; i < 100; i++) {
await tracker.addFutureFromClosure(() async => completed.add(i));
await tracker.add(() async => completed.add(i));
}
await tracker.wait();
await tracker.tasksComplete;
expect(completed.length, equals(100));
});

test('basic parallel processing works on exceptions', () async {
var completed = <int>{};
var tracker = MultiFutureTracker(10);
var tracker = TaskQueue(maxJobs: 10);
for (var i = 0; i < 50; i++) {
await tracker.addFutureFromClosure(() async => completed.add(i));
await tracker.add(() async => completed.add(i));
}
for (var i = 50; i < 65; i++) {
await tracker.addFutureFromClosure(() async => throw Exception());
try {
await tracker.add(() async => throw TestException());
} on TestException {
// Ignore
}
}
for (var i = 65; i < 100; i++) {
await tracker.addFutureFromClosure(() async => completed.add(i));
await tracker.add(() async => completed.add(i));
}
await tracker.wait();
await tracker.tasksComplete;
expect(completed.length, equals(85));
});
});
}

class TestException implements Exception {}
12 changes: 6 additions & 6 deletions tool/grind.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ final String defaultPubCache = Platform.environment['PUB_CACHE'] ??
path.context.resolveTildePath('~/.pub-cache');

/// Run no more than the number of processors available in parallel.
final MultiFutureTracker testFutures = MultiFutureTracker(
int.tryParse(Platform.environment['MAX_TEST_FUTURES'] ?? '') ??
final TaskQueue testFutures = TaskQueue(
maxJobs: int.tryParse(Platform.environment['MAX_TEST_FUTURES'] ?? '') ??
Platform.numberOfProcessors);

// Directory.systemTemp is not a constant. So wrap it.
Expand Down Expand Up @@ -1114,21 +1114,21 @@ Future<void> tryPublish() async {
@Depends(clean)
Future<void> smokeTest() async {
await testDart2(smokeTestFiles);
await testFutures.wait();
await testFutures.tasksComplete;
}

@Task('Run non-smoke tests, only')
@Depends(clean)
Future<void> longTest() async {
await testDart2(testFiles);
await testFutures.wait();
await testFutures.tasksComplete;
}

@Task('Run all the tests.')
@Depends(clean)
Future<void> test() async {
await testDart2(smokeTestFiles.followedBy(testFiles));
await testFutures.wait();
await testFutures.tasksComplete;
}

@Task('Clean up pub data from test directories')
Expand Down Expand Up @@ -1166,7 +1166,7 @@ Future<void> testDart2(Iterable<File> tests) async {
var parameters = <String>['--enable-asserts'];

for (var dartFile in tests) {
await testFutures.addFutureFromClosure(() =>
await testFutures.add(() =>
CoverageSubprocessLauncher('dart2-${path.basename(dartFile.path)}')
.runStreamed(Platform.resolvedExecutable,
<String>[...parameters, dartFile.path]));
Expand Down