-
Notifications
You must be signed in to change notification settings - Fork 125
Fix concurrency issues in tool execution #2730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
/// This is a helper library to make working with io easier. | ||
library dartdoc.io_utils; | ||
|
||
import 'dart:async'; | ||
import 'dart:collection'; | ||
import 'dart:convert'; | ||
import 'dart:io' as io; | ||
|
||
|
@@ -118,40 +120,102 @@ final RegExp partOfRegexp = RegExp('part of '); | |
'as Dartdoc 1.0.0') | ||
final RegExp newLinePartOfRegexp = RegExp('\npart of '); | ||
|
||
/// Best used with Future<void>. | ||
class MultiFutureTracker<T> { | ||
/// Approximate maximum number of simultaneous active Futures. | ||
final int parallel; | ||
typedef TaskQueueClosure<T> = Future<T> Function(); | ||
|
||
class _TaskQueueItem<T> { | ||
_TaskQueueItem(this._closure, this._completer, {this.onComplete}); | ||
|
||
final TaskQueueClosure<T> _closure; | ||
final Completer<T> _completer; | ||
void Function() onComplete; | ||
|
||
Future<void> run() async { | ||
try { | ||
final result = await _closure(); | ||
if (result != null) { | ||
gspencergoog marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_completer.complete(result); | ||
} else { | ||
_completer.complete(null); | ||
} | ||
await Future<void>.microtask(() {}); | ||
gspencergoog marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} catch (e) { | ||
_completer.completeError(e); | ||
} finally { | ||
onComplete?.call(); | ||
} | ||
} | ||
} | ||
|
||
final Set<Future<T>> _trackedFutures = {}; | ||
/// A task queue of Futures to be completed in parallel, throttling | ||
/// the number of simultaneous tasks. | ||
/// | ||
/// The tasks return results of type T. | ||
class TaskQueue<T> { | ||
/// Creates a task queue with a maximum number of simultaneous jobs. | ||
/// The [maxJobs] parameter defaults to the number of CPU cores on the | ||
/// system. | ||
TaskQueue({int maxJobs}) | ||
: maxJobs = maxJobs ?? io.Platform.numberOfProcessors; | ||
gspencergoog marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/// The maximum number of jobs that this queue will run simultaneously. | ||
final int maxJobs; | ||
|
||
final Queue<_TaskQueueItem<T>> _pendingTasks = Queue<_TaskQueueItem<T>>(); | ||
final Set<_TaskQueueItem<T>> _activeTasks = <_TaskQueueItem<T>>{}; | ||
final Set<Completer<void>> _completeListeners = <Completer<void>>{}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, this structure is more easily verified than the queue-length original version for waiting on completion. nice. |
||
|
||
/// Returns a future that completes when all tasks in the [TaskQueue] are | ||
/// complete. | ||
Future<void> get tasksComplete { | ||
// In case this is called when there are no tasks, we want it to | ||
// signal complete immediately. | ||
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) { | ||
return Future<void>.value(); | ||
} | ||
final completer = Completer<void>(); | ||
_completeListeners.add(completer); | ||
return completer.future; | ||
} | ||
|
||
MultiFutureTracker(this.parallel); | ||
/// Adds a single closure to the task queue, returning a future that | ||
/// completes when the task completes. | ||
Future<T> add(TaskQueueClosure<T> task) { | ||
final completer = Completer<T>(); | ||
_pendingTasks.add(_TaskQueueItem<T>(task, completer)); | ||
_processTasks(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC this seems like it will spawn a lot of _processTasks() that could potentially do "duplicate" work in the case where the _processTask() from the onComplete has it covered. This is probably fine though as the duplicates are written to collapse and do nothing. The old version probably got into trouble doing things cleverly to avoid this... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, possibly, but it would also make sure that when a new task is added, it starts running right away. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. I was thinking of the case though when the maximum concurrency is reached and _processTasks() here doesn't have any useful effect. It seems like it should be OK just to inline _processTasks() here and eliminate an unawaited future that sometimes doesn't do anything. Your choice, and of course, only if it works just as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, good point, that makes sense. OK, I inlined it. |
||
return completer.future; | ||
} | ||
|
||
/// Wait until fewer or equal to this many Futures are outstanding. | ||
Future<void> _waitUntil(int max) async { | ||
while (_trackedFutures.length > max) { | ||
await Future.any(_trackedFutures); | ||
// Process a single task. | ||
void _processTask() { | ||
if (_pendingTasks.isNotEmpty && _activeTasks.length <= maxJobs) { | ||
final item = _pendingTasks.removeFirst(); | ||
_activeTasks.add(item); | ||
item.onComplete = () { | ||
_activeTasks.remove(item); | ||
_processTask(); | ||
}; | ||
item.run(); | ||
} else { | ||
_checkForCompletion(); | ||
} | ||
} | ||
|
||
/// Generates a [Future] from the given closure and adds it to the queue, | ||
/// once the queue is sufficiently empty. The returned future completes | ||
/// when the generated [Future] has been added to the queue. | ||
/// | ||
/// If the closure does not handle its own exceptions, other calls to | ||
/// [addFutureFromClosure] or [wait] may trigger an exception. | ||
Future<void> addFutureFromClosure(Future<T> Function() closure) async { | ||
await _waitUntil(parallel - 1); | ||
Future<void> future = closure(); | ||
_trackedFutures.add(future); | ||
// ignore: unawaited_futures | ||
future.then((f) { | ||
_trackedFutures.remove(future); | ||
}, onError: (s, e) { | ||
_trackedFutures.remove(future); | ||
}); | ||
void _checkForCompletion() { | ||
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) { | ||
for (final completer in _completeListeners) { | ||
if (!completer.isCompleted) { | ||
completer.complete(); | ||
} | ||
} | ||
_completeListeners.clear(); | ||
} | ||
} | ||
|
||
/// Wait until all futures added so far have completed. | ||
Future<void> wait() async => await _waitUntil(0); | ||
// Process any pending tasks. | ||
Future<void> _processTasks() async { | ||
if (_activeTasks.length < maxJobs) { | ||
_processTask(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this makes things so much easier to read.