Skip to content

Commit cd58b5e

Browse files
committed
Fix concurrency issues
1 parent 26bde83 commit cd58b5e

File tree

5 files changed

+133
-74
lines changed

5 files changed

+133
-74
lines changed

lib/src/io_utils.dart

Lines changed: 83 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
/// This is a helper library to make working with io easier.
66
library dartdoc.io_utils;
77

8+
import 'dart:async';
9+
import 'dart:collection';
810
import 'dart:convert';
911
import 'dart:io' as io;
1012

@@ -118,40 +120,92 @@ final RegExp partOfRegexp = RegExp('part of ');
118120
'as Dartdoc 1.0.0')
119121
final RegExp newLinePartOfRegexp = RegExp('\npart of ');
120122

121-
/// Best used with Future<void>.
122-
class MultiFutureTracker<T> {
123-
/// Approximate maximum number of simultaneous active Futures.
124-
final int parallel;
123+
typedef TaskQueueClosure<T> = Future<T> Function();
124+
typedef CompleteCallback = void Function();
125+
126+
class _TaskQueueItem<T> {
127+
_TaskQueueItem(this.closure, this.completer, {this.onComplete});
128+
129+
final TaskQueueClosure<T> closure;
130+
final Completer<T> completer;
131+
CompleteCallback onComplete;
132+
133+
Future<void> run() async {
134+
try {
135+
final result = await closure();
136+
if (result != null) {
137+
completer.complete(result);
138+
} else {
139+
completer.complete(null);
140+
}
141+
await Future<void>.microtask(() {});
142+
} catch (e) {
143+
completer.completeError(e);
144+
} finally {
145+
onComplete?.call();
146+
}
147+
}
148+
}
125149

126-
final Set<Future<T>> _trackedFutures = {};
150+
/// A task queue of Futures to be completed in parallel, throttling
151+
/// the number of simultaneous tasks.
152+
///
153+
/// The tasks return results of type T.
154+
class TaskQueue<T> {
155+
/// Creates a task queue with a maximum number of simultaneous jobs.
156+
/// The [maxJobs] parameter defaults to the number of CPU cores on the
157+
/// system.
158+
TaskQueue({int maxJobs}) : maxJobs = maxJobs ?? io.Platform.numberOfProcessors;
159+
160+
/// The maximum number of jobs that this queue will run simultaneously.
161+
final int maxJobs;
162+
163+
final Queue<_TaskQueueItem<T>> _pendingTasks = Queue<_TaskQueueItem<T>>();
164+
final Set<_TaskQueueItem<T>> _activeTasks = <_TaskQueueItem<T>>{};
165+
final Set<Completer<void>> _completeListeners = <Completer<void>>{};
166+
167+
/// Returns a future that completes when all tasks in the [TaskQueue] are
168+
/// complete.
169+
Future<void> get tasksComplete {
170+
final completer = Completer<void>();
171+
_completeListeners.add(completer);
172+
return completer.future;
173+
}
127174

128-
MultiFutureTracker(this.parallel);
175+
/// Adds a single closure to the task queue, returning a future that
176+
/// completes when the task completes.
177+
Future<T> add(TaskQueueClosure<T> task) {
178+
final completer = Completer<T>();
179+
_pendingTasks.add(_TaskQueueItem<T>(task, completer));
180+
_processTasks();
181+
return completer.future;
182+
}
129183

130-
/// Wait until fewer or equal to this many Futures are outstanding.
131-
Future<void> _waitUntil(int max) async {
132-
while (_trackedFutures.length > max) {
133-
await Future.any(_trackedFutures);
184+
// Process a single task.
185+
void _processTask() {
186+
if (_pendingTasks.isNotEmpty &&
187+
_activeTasks.length <= maxJobs) {
188+
final item = _pendingTasks.removeFirst();
189+
_activeTasks.add(item);
190+
item.onComplete = () {
191+
_activeTasks.remove(item);
192+
_processTask();
193+
};
194+
item.run();
195+
} else if (_activeTasks.isEmpty && _pendingTasks.isEmpty) {
196+
for (final completer in _completeListeners) {
197+
if (!completer.isCompleted) {
198+
completer.complete();
199+
}
200+
}
201+
_completeListeners.clear();
134202
}
135203
}
136204

137-
/// Generates a [Future] from the given closure and adds it to the queue,
138-
/// once the queue is sufficiently empty. The returned future completes
139-
/// when the generated [Future] has been added to the queue.
140-
///
141-
/// If the closure does not handle its own exceptions, other calls to
142-
/// [addFutureFromClosure] or [wait] may trigger an exception.
143-
Future<void> addFutureFromClosure(Future<T> Function() closure) async {
144-
await _waitUntil(parallel - 1);
145-
Future<void> future = closure();
146-
_trackedFutures.add(future);
147-
// ignore: unawaited_futures
148-
future.then((f) {
149-
_trackedFutures.remove(future);
150-
}, onError: (s, e) {
151-
_trackedFutures.remove(future);
152-
});
205+
// Process any pending tasks.
206+
Future<void> _processTasks() async {
207+
if (_activeTasks.length < maxJobs) {
208+
_processTask();
209+
}
153210
}
154-
155-
/// Wait until all futures added so far have completed.
156-
Future<void> wait() async => await _waitUntil(0);
157211
}

lib/src/model/package_graph.dart

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class PackageGraph with CommentReferable, Nameable {
8686
// specialClasses handler so when we find them, they get added.
8787
specialClasses = SpecialClasses();
8888
// Go through docs of every ModelElement in package to pre-build the macros
89-
// index. Uses toList() in order to get all the precaching on the stack.
89+
// index.
9090
await Future.wait(precacheLocalDocs());
9191
_localDocumentationBuilt = true;
9292

@@ -153,6 +153,8 @@ class PackageGraph with CommentReferable, Nameable {
153153
}
154154
yield* precacheOneElement(m);
155155
}
156+
// Now wait for any of the tasks still running to complete.
157+
yield config.tools.runner.wait();
156158
}
157159

158160
// Many ModelElements have the same ModelNode; don't build/cache this data more

lib/src/tool_runner.dart

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@ typedef ToolErrorCallback = void Function(String message);
1717
typedef FakeResultCallback = String Function(String tool,
1818
{List<String> args, String content});
1919

20-
/// Set a ceiling on how many tool instances can be in progress at once,
21-
/// limiting both parallelization and the number of open temporary files.
22-
final MultiFutureTracker<void> _toolTracker = MultiFutureTracker(4);
23-
2420
class ToolTempFileTracker {
2521
final ResourceProvider resourceProvider;
2622
final Folder temporaryDirectory;
@@ -66,6 +62,12 @@ class ToolRunner {
6662
/// generated by the tool.
6763
ToolRunner(this.toolConfiguration);
6864

65+
/// Set a ceiling on how many tool instances can be in progress at once,
66+
/// limiting both parallelization and the number of open temporary files.
67+
static final TaskQueue<String> _toolTracker = TaskQueue<String>();
68+
69+
Future<void> wait() => _toolTracker.tasksComplete;
70+
6971
final ToolConfiguration toolConfiguration;
7072

7173
Future<void> _runSetup(
@@ -98,8 +100,7 @@ class ToolRunner {
98100
{@required ToolErrorCallback toolErrorCallback}) async {
99101
String commandString() => ([commandPath] + args).join(' ');
100102
try {
101-
var result =
102-
await Process.run(commandPath, args, environment: environment);
103+
var result = await Process.run(commandPath, args, environment: environment);
103104
if (result.exitCode != 0) {
104105
toolErrorCallback('Tool "$name" returned non-zero exit code '
105106
'(${result.exitCode}) when run as "${commandString()}" from '
@@ -131,16 +132,12 @@ class ToolRunner {
131132
Map<String, String> environment}) async {
132133
assert(args != null);
133134
assert(args.isNotEmpty);
134-
Future<String> runner;
135-
// Prevent too many tools from running simultaneously.
136-
await _toolTracker.addFutureFromClosure(() {
137-
runner = _run(args,
135+
return _toolTracker.add(() {
136+
return _run(args,
138137
toolErrorCallback: toolErrorCallback,
139138
content: content,
140139
environment: environment);
141-
return runner;
142140
});
143-
return runner;
144141
}
145142

146143
Future<String> _run(List<String> args,

test/io_utils_test.dart

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,90 +21,96 @@ void main() {
2121
group('MultiFutureTracker', () {
2222
/// A special test designed to check shared [MultiFutureTracker]
2323
/// behavior when exceptions occur after a delay in the passed closures to
24-
/// [MultiFutureTracker.addFutureFromClosure].
24+
/// [MultiFutureTracker.add].
2525
test('no deadlock when delayed exceptions fire in closures', () async {
26-
var sharedTracker = MultiFutureTracker(2);
26+
var sharedTracker = TaskQueue(maxJobs: 2);
2727
expect(() async {
2828
var t =
2929
Future.delayed(Duration(milliseconds: 10), () => throw Exception());
30-
await sharedTracker.addFutureFromClosure(() => t);
30+
await sharedTracker.add(() => t);
3131
return t;
3232
}, throwsA(const TypeMatcher<Exception>()));
3333
expect(() async {
3434
var t =
3535
Future.delayed(Duration(milliseconds: 10), () => throw Exception());
36-
await sharedTracker.addFutureFromClosure(() => t);
36+
await sharedTracker.add(() => t);
3737
return t;
3838
}, throwsA(const TypeMatcher<Exception>()));
3939
expect(() async {
4040
var t =
4141
Future.delayed(Duration(milliseconds: 10), () => throw Exception());
4242
// ignore: empty_catches
43-
await sharedTracker.addFutureFromClosure(() => t);
43+
await sharedTracker.add(() => t);
4444
return t;
4545
}, throwsA(const TypeMatcher<Exception>()));
4646
expect(() async {
4747
var t =
4848
Future.delayed(Duration(milliseconds: 10), () => throw Exception());
49-
await sharedTracker.addFutureFromClosure(() => t);
49+
await sharedTracker.add(() => t);
5050
return t;
5151
}, throwsA(const TypeMatcher<Exception>()));
5252

5353
/// We deadlock here if the exception is not handled properly.
54-
await sharedTracker.wait();
54+
await sharedTracker.tasksComplete;
5555
});
5656

5757
test('basic sequential processing works with no deadlock', () async {
5858
var completed = <int>{};
59-
var tracker = MultiFutureTracker(1);
60-
await tracker.addFutureFromClosure(() async => completed.add(1));
61-
await tracker.addFutureFromClosure(() async => completed.add(2));
62-
await tracker.addFutureFromClosure(() async => completed.add(3));
63-
await tracker.wait();
59+
var tracker = TaskQueue(maxJobs: 1);
60+
await tracker.add(() async => completed.add(1));
61+
await tracker.add(() async => completed.add(2));
62+
await tracker.add(() async => completed.add(3));
63+
await tracker.tasksComplete;
6464
expect(completed.length, equals(3));
6565
});
6666

6767
test('basic sequential processing works on exceptions', () async {
6868
var completed = <int>{};
69-
var tracker = MultiFutureTracker(1);
70-
await tracker.addFutureFromClosure(() async => completed.add(0));
69+
var tracker = TaskQueue(maxJobs: 1);
70+
await tracker.add(() async => completed.add(0));
7171
await tracker
72-
.addFutureFromClosure(() async => throw Exception())
72+
.add(() async => throw Exception())
7373
.catchError((e) {});
7474
await tracker
75-
.addFutureFromClosure(() async => throw Exception())
75+
.add(() async => throw Exception())
7676
.catchError((e) {});
77-
await tracker.addFutureFromClosure(() async => completed.add(3));
78-
await tracker.wait();
77+
await tracker.add(() async => completed.add(3));
78+
await tracker.tasksComplete;
7979
expect(completed.length, equals(2));
8080
});
8181

8282
/// Verify that if there are more exceptions than the maximum number
8383
/// of in-flight [Future]s that there is no deadlock.
8484
test('basic parallel processing works with no deadlock', () async {
8585
var completed = <int>{};
86-
var tracker = MultiFutureTracker(10);
86+
var tracker = TaskQueue(maxJobs: 10);
8787
for (var i = 0; i < 100; i++) {
88-
await tracker.addFutureFromClosure(() async => completed.add(i));
88+
await tracker.add(() async => completed.add(i));
8989
}
90-
await tracker.wait();
90+
await tracker.tasksComplete;
9191
expect(completed.length, equals(100));
9292
});
9393

9494
test('basic parallel processing works on exceptions', () async {
9595
var completed = <int>{};
96-
var tracker = MultiFutureTracker(10);
96+
var tracker = TaskQueue(maxJobs: 10);
9797
for (var i = 0; i < 50; i++) {
98-
await tracker.addFutureFromClosure(() async => completed.add(i));
98+
await tracker.add(() async => completed.add(i));
9999
}
100100
for (var i = 50; i < 65; i++) {
101-
await tracker.addFutureFromClosure(() async => throw Exception());
101+
try {
102+
await tracker.add(() async => throw TestException());
103+
} on TestException {
104+
// Ignore
105+
}
102106
}
103107
for (var i = 65; i < 100; i++) {
104-
await tracker.addFutureFromClosure(() async => completed.add(i));
108+
await tracker.add(() async => completed.add(i));
105109
}
106-
await tracker.wait();
110+
await tracker.tasksComplete;
107111
expect(completed.length, equals(85));
108112
});
109113
});
110114
}
115+
116+
class TestException implements Exception {}

tool/grind.dart

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ final String defaultPubCache = Platform.environment['PUB_CACHE'] ??
4949
path.context.resolveTildePath('~/.pub-cache');
5050

5151
/// Run no more than the number of processors available in parallel.
52-
final MultiFutureTracker testFutures = MultiFutureTracker(
53-
int.tryParse(Platform.environment['MAX_TEST_FUTURES'] ?? '') ??
52+
final TaskQueue testFutures = TaskQueue(
53+
maxJobs: int.tryParse(Platform.environment['MAX_TEST_FUTURES'] ?? '') ??
5454
Platform.numberOfProcessors);
5555

5656
// Directory.systemTemp is not a constant. So wrap it.
@@ -1114,21 +1114,21 @@ Future<void> tryPublish() async {
11141114
@Depends(clean)
11151115
Future<void> smokeTest() async {
11161116
await testDart2(smokeTestFiles);
1117-
await testFutures.wait();
1117+
await testFutures.tasksComplete;
11181118
}
11191119

11201120
@Task('Run non-smoke tests, only')
11211121
@Depends(clean)
11221122
Future<void> longTest() async {
11231123
await testDart2(testFiles);
1124-
await testFutures.wait();
1124+
await testFutures.tasksComplete;
11251125
}
11261126

11271127
@Task('Run all the tests.')
11281128
@Depends(clean)
11291129
Future<void> test() async {
11301130
await testDart2(smokeTestFiles.followedBy(testFiles));
1131-
await testFutures.wait();
1131+
await testFutures.tasksComplete;
11321132
}
11331133

11341134
@Task('Clean up pub data from test directories')
@@ -1166,7 +1166,7 @@ Future<void> testDart2(Iterable<File> tests) async {
11661166
var parameters = <String>['--enable-asserts'];
11671167

11681168
for (var dartFile in tests) {
1169-
await testFutures.addFutureFromClosure(() =>
1169+
await testFutures.add(() =>
11701170
CoverageSubprocessLauncher('dart2-${path.basename(dartFile.path)}')
11711171
.runStreamed(Platform.resolvedExecutable,
11721172
<String>[...parameters, dartFile.path]));

0 commit comments

Comments
 (0)