-
Notifications
You must be signed in to change notification settings - Fork 125
Fix concurrency issues in tool execution #2730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ce7032c
to
cd58b5e
Compare
cd58b5e
to
f55aa5f
Compare
f55aa5f
to
a930149
Compare
6b5dde4
to
8625d68
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great implementation. Just a few comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love this, but I'd love a review from @jcollins-g as well before we merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really good, a definite improvement on my original implementation. Function typedefs for the win, too!
class MultiFutureTracker<T> { | ||
/// Approximate maximum number of simultaneous active Futures. | ||
final int parallel; | ||
typedef TaskQueueClosure<T> = Future<T> Function(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this makes things so much easier to read.
|
||
final Queue<_TaskQueueItem<T>> _pendingTasks = Queue<_TaskQueueItem<T>>(); | ||
final Set<_TaskQueueItem<T>> _activeTasks = <_TaskQueueItem<T>>{}; | ||
final Set<Completer<void>> _completeListeners = <Completer<void>>{}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this structure is more easily verified than the queue-length original version for waiting on completion. nice.
lib/src/io_utils.dart
Outdated
Future<T> add(TaskQueueClosure<T> task) { | ||
final completer = Completer<T>(); | ||
_pendingTasks.add(_TaskQueueItem<T>(task, completer)); | ||
_processTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this seems like it will spawn a lot of _processTasks() that could potentially do "duplicate" work in the case where the _processTask() from the onComplete has it covered. This is probably fine though as the duplicates are written to collapse and do nothing. The old version probably got into trouble doing things cleverly to avoid this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, possibly, but it would also make sure that when a new task is added, it starts running right away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I was thinking of the case though when the maximum concurrency is reached and _processTasks() here doesn't have any useful effect. It seems like it should be OK just to inline _processTasks() here and eliminate an unawaited future that sometimes doesn't do anything. Your choice, and of course, only if it works just as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, good point, that makes sense. OK, I inlined it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
Description
This replaces the task queue called
MultiFutureTracker
with aTaskQueue
, which fixes the concurrency issue where the old class didn't really wait for tasks to complete before starting a new one.This uses a completer to provide a new future to wait for each task, uses a
Queue
to keep the tasks, and returns aFuture<T>
with the result of the task instead of waiting on aFuture<void>
.There is also a future to wait on that will complete when the queue is empty (replacing
wait
) calledtasksComplete
.Related Issues
Tests