Skip to content

Commit 794dc9d

Browse files
committed
Only run at most 10 transformers at once in barback.
BUG=14320 [email protected] Review URL: https://codereview.chromium.org//36213002 git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@29060 260f80e4-7a28-3924-810f-c04153c831b5
1 parent 72d5dcb commit 794dc9d

File tree

6 files changed

+197
-134
lines changed

6 files changed

+197
-134
lines changed

pkg/barback/lib/src/file_pool.dart

Lines changed: 13 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ import 'dart:collection';
99
import 'dart:convert';
1010
import 'dart:io';
1111

12-
import 'package:stack_trace/stack_trace.dart';
13-
12+
import 'pool.dart';
1413
import 'utils.dart';
1514

1615
/// Manages a pool of files that are opened for reading to cope with maximum
@@ -21,42 +20,26 @@ import 'utils.dart';
2120
/// again. If this doesn't succeed after a certain amount of time, the open
2221
/// will fail and the original "too many files" exception will be thrown.
2322
class FilePool {
24-
/// [_FileReader]s whose last [listen] call failed and that are waiting for
25-
/// another file to close so they can be retried.
26-
final _pendingListens = new Queue<_FileReader>();
27-
28-
/// The timeout timer.
29-
///
30-
/// This timer is set as soon as the file limit is reached and is reset every
31-
/// time a file finishes being read or a new file is opened. If it fires, that
32-
/// indicates that the caller became deadlocked, likely due to files waiting
33-
/// for additional files to be read before they could be closed.
34-
Timer _timer;
35-
36-
/// The number of files currently open in the pool.
37-
int _openFiles = 0;
38-
39-
/// The maximum number of file descriptors that the pool will allocate.
23+
/// The underlying pool.
4024
///
41-
/// This is based on empirical tests that indicate that beyond 32, additional
42-
/// file reads don't provide substantial additional throughput.
43-
final int _maxOpenFiles = 32;
25+
/// The maximum number of allocated descriptors is based on empirical tests
26+
/// that indicate that beyond 32, additional file reads don't provide
27+
/// substantial additional throughput.
28+
final Pool _pool = new Pool(32, timeout: new Duration(seconds: 60));
4429

4530
/// Opens [file] for reading.
4631
///
4732
/// When the returned stream is listened to, if there are too many files
4833
/// open, this will wait for a previously opened file to be closed and then
4934
/// try again.
5035
Stream<List<int>> openRead(File file) {
51-
var reader = new _FileReader(this, file);
52-
if (_openFiles < _maxOpenFiles) {
53-
_openFiles++;
54-
reader.start();
55-
} else {
56-
_pendingListens.add(reader);
57-
_heartbeat();
58-
}
59-
return reader.stream;
36+
return futureStream(_pool.checkOut().then((resource) {
37+
return file.openRead().transform(new StreamTransformer.fromHandlers(
38+
handleDone: (sink) {
39+
sink.close();
40+
resource.release();
41+
}));
42+
}));
6043
}
6144

6245
/// Reads [file] as a string using [encoding].
@@ -79,107 +62,4 @@ class FilePool {
7962

8063
return completer.future;
8164
}
82-
83-
/// If there are any file reads that are waiting for available descriptors,
84-
/// this will allow the oldest one to start reading.
85-
void _startPendingListen() {
86-
if (_pendingListens.isEmpty) {
87-
_openFiles--;
88-
if (_timer != null) {
89-
_timer.cancel();
90-
_timer = null;
91-
}
92-
return;
93-
}
94-
95-
_heartbeat();
96-
var pending = _pendingListens.removeFirst();
97-
pending.start();
98-
}
99-
100-
/// Indicates that some external action has occurred and the timer should be
101-
/// restarted.
102-
void _heartbeat() {
103-
if (_timer != null) _timer.cancel();
104-
_timer = new Timer(new Duration(seconds: 60), _onTimeout);
105-
}
106-
107-
/// Handles [_timer] timing out by causing all pending file readers to emit
108-
/// exceptions.
109-
void _onTimeout() {
110-
for (var reader in _pendingListens) {
111-
reader.timeout();
112-
}
113-
_pendingListens.clear();
114-
_timer = null;
115-
}
116-
}
117-
118-
/// Wraps a raw file reading stream in a stream that handles "too many files"
119-
/// errors.
120-
///
121-
/// This also notifies the pool when the underlying file stream is closed so
122-
/// that it can try to open a waiting file.
123-
class _FileReader {
124-
final FilePool _pool;
125-
final File _file;
126-
127-
/// Whether the caller has paused this reader's stream.
128-
bool _isPaused = false;
129-
130-
/// The underyling file stream.
131-
Stream<List<int>> _fileStream;
132-
133-
/// The controller for the stream wrapper.
134-
StreamController<List<int>> _controller;
135-
136-
/// The current subscription to the underlying file stream.
137-
///
138-
/// This will only be non-null while the wrapped stream is being listened to.
139-
StreamSubscription _subscription;
140-
141-
/// The wrapped stream that the file can be read from.
142-
Stream<List<int>> get stream => _controller.stream;
143-
144-
_FileReader(this._pool, this._file) {
145-
_controller = new StreamController<List<int>>(onPause: () {
146-
_isPaused = true;
147-
if (_subscription != null) _subscription.pause();
148-
}, onResume: () {
149-
_isPaused = false;
150-
if (_subscription != null) _subscription.resume();
151-
}, onCancel: () {
152-
if (_subscription != null) _subscription.cancel();
153-
_subscription = null;
154-
}, sync: true);
155-
}
156-
157-
/// Starts listening to the underlying file stream.
158-
void start() {
159-
_fileStream = _file.openRead();
160-
_subscription = _fileStream.listen(_controller.add,
161-
onError: _onError, onDone: _onDone, cancelOnError: true);
162-
if (_isPaused) _subscription.pause();
163-
}
164-
165-
/// Emits a timeout exception.
166-
void timeout() {
167-
assert(_subscription == null);
168-
_controller.addError("FilePool deadlock: all file descriptors have been in "
169-
"use for too long.", new Trace.current().vmTrace);
170-
_controller.close();
171-
}
172-
173-
/// Forwards an error from the underlying file stream.
174-
void _onError(Object exception, StackTrace stackTrace) {
175-
_controller.addError(exception, stackTrace);
176-
_onDone();
177-
}
178-
179-
/// Handles the underlying file stream finishing.
180-
void _onDone() {
181-
_subscription = null;
182-
_controller.close();
183-
_pool._startPendingListen();
184-
}
18565
}

pkg/barback/lib/src/package_graph.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import 'barback_logger.dart';
1414
import 'build_result.dart';
1515
import 'errors.dart';
1616
import 'package_provider.dart';
17+
import 'pool.dart';
1718
import 'transformer.dart';
1819
import 'utils.dart';
1920

@@ -65,6 +66,13 @@ class PackageGraph {
6566
/// [Future] returned by [getAllAssets].
6667
var _lastUnexpectedError;
6768

69+
// TODO(nweiz): Allow transformers to declare themselves as "lightweight" or
70+
// "heavyweight" and adjust their restrictions appropriately. Simple
71+
// transformers may be very efficient to run in parallel, whereas dart2js uses
72+
// a lot of memory and should be run more sequentially.
73+
/// A pool that controls how many transformers may be applied at once.
74+
final Pool transformPool = new Pool(10);
75+
6876
/// Creates a new [PackageGraph] that will transform assets in all packages
6977
/// made available by [provider].
7078
PackageGraph(this.provider, {BarbackLogger logger})

pkg/barback/lib/src/pool.dart

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import 'dart:async';
2+
import 'dart:collection';
3+
4+
import 'package:stack_trace/stack_trace.dart';
5+
6+
/// Manages an abstract pool of resources with a limit on how many may be in use
7+
/// at once.
8+
///
9+
/// When a resource is needed, the user should call [checkOut]. When the
10+
/// returned future completes with a [PoolResource], the resource may be
11+
/// allocated. Once the resource has been released, the user should call
12+
/// [PoolResource.release]. The pool will ensure that only a certain number of
13+
/// [PoolResource]s may be checked out at once.
14+
class Pool {
15+
/// Completers for checkouts beyond the first [_maxCheckedOutResources].
16+
///
17+
/// When an item is released, the next element of [_pendingResources] will be
18+
/// completed.
19+
final _pendingResources = new Queue<Completer<PoolResource>>();
20+
21+
/// The maximum number of resources that may be checked out at once.
22+
final int _maxCheckedOutResources;
23+
24+
/// The number of resources that are currently checked out.
25+
int _checkedOutResources = 0;
26+
27+
/// The timeout timer.
28+
///
29+
/// If [_timeout] isn't null, this timer is set as soon as the resource limit
30+
/// is reached and is reset every time an resource is released or a new
31+
/// resource is requested. If it fires, that indicates that the caller became
32+
/// deadlocked, likely due to files waiting for additional files to be read
33+
/// before they could be closed.
34+
Timer _timer;
35+
36+
/// The amount of time to wait before timing out the pending resources.
37+
Duration _timeout;
38+
39+
/// Creates a new pool with the given limit on how many resources may be
40+
/// checked out at once.
41+
///
42+
/// If [timeout] is passed, then if that much time passes without any activity
43+
/// all pending [checkOut] futures will throw an exception. This is indented
44+
/// to avoid deadlocks.
45+
Pool(this._maxCheckedOutResources, {Duration timeout})
46+
: _timeout = timeout;
47+
48+
/// Check out a [PoolResource].
49+
///
50+
/// If the maximum number of resources is already checked out, this will delay
51+
/// until one of them is released.
52+
Future<PoolResource> checkOut() {
53+
if (_checkedOutResources < _maxCheckedOutResources) {
54+
_checkedOutResources++;
55+
return new Future.value(new PoolResource._(this));
56+
} else {
57+
var completer = new Completer<PoolResource>();
58+
_pendingResources.add(completer);
59+
_heartbeat();
60+
return completer.future;
61+
}
62+
}
63+
64+
/// Checks out a resource for the duration of [callback], which may return a
65+
/// Future.
66+
///
67+
/// The return value of [callback] is piped to the returned Future.
68+
Future withResource(callback()) {
69+
return checkOut().then((resource) =>
70+
new Future.sync(callback).whenComplete(resource.release));
71+
}
72+
73+
/// If there are any pending checkouts, this will fire the oldest one.
74+
void _onResourceReleased() {
75+
if (_pendingResources.isEmpty) {
76+
_checkedOutResources--;
77+
if (_timer != null) {
78+
_timer.cancel();
79+
_timer = null;
80+
}
81+
return;
82+
}
83+
84+
_heartbeat();
85+
var pending = _pendingResources.removeFirst();
86+
pending.complete(new PoolResource._(this));
87+
}
88+
89+
/// Indicates that some external action has occurred and the timer should be
90+
/// restarted.
91+
void _heartbeat() {
92+
if (_timer != null) _timer.cancel();
93+
if (_timeout == null) {
94+
_timer = null;
95+
} else {
96+
_timer = new Timer(_timeout, _onTimeout);
97+
}
98+
}
99+
100+
/// Handles [_timer] timing out by causing all pending resource completers to
101+
/// emit exceptions.
102+
void _onTimeout() {
103+
for (var completer in _pendingResources) {
104+
completer.completeException("Pool deadlock: all resources have been "
105+
"checked out for too long.", new Trace.current().vmTrace);
106+
}
107+
_pendingResources.clear();
108+
_timer = null;
109+
}
110+
}
111+
112+
/// A member of a [Pool].
113+
///
114+
/// A [PoolResource] is a token that indicates that a resource is allocated.
115+
/// When the associated resource is released, the user should call [release].
116+
class PoolResource {
117+
final Pool _pool;
118+
119+
/// Whether [this] has been released yet.
120+
bool _released = false;
121+
122+
PoolResource._(this._pool);
123+
124+
/// Tells the parent [Pool] that the resource associated with this resource is
125+
/// no longer allocated, and that a new [PoolResource] may be checked out.
126+
void release() {
127+
if (_released) {
128+
throw new StateError("A PoolResource may only be released once.");
129+
}
130+
_released = true;
131+
_pool._onResourceReleased();
132+
}
133+
}

pkg/barback/lib/src/transform_node.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ class TransformNode {
127127
_inputSubscriptions.clear();
128128

129129
_isDirty = false;
130-
return transformer.apply(transform).catchError((error) {
130+
131+
return phase.cascade.graph.transformPool
132+
.withResource(() => transformer.apply(transform))
133+
.catchError((error) {
131134
// If the transform became dirty while processing, ignore any errors from
132135
// it.
133136
if (_isDirty) return;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
library barback.test.package_graph.transform_test;
6+
7+
import 'package:barback/src/utils.dart';
8+
import 'package:scheduled_test/scheduled_test.dart';
9+
10+
import '../utils.dart';
11+
12+
main() {
13+
initConfig();
14+
15+
test("handles many parallel transformers", () {
16+
var files = new List.generate(100, (i) => "app|$i.txt");
17+
var rewrite = new RewriteTransformer("txt", "out");
18+
initGraph(files, {"app": [[rewrite]]});
19+
20+
// Pause and resume apply to simulate parallel long-running transformers.
21+
rewrite.pauseApply();
22+
updateSources(files);
23+
schedule(pumpEventQueue);
24+
rewrite.resumeApply();
25+
26+
for (var i = 0; i < 100; i++) {
27+
expectAsset("app|$i.out", "$i.out");
28+
}
29+
buildShouldSucceed();
30+
31+
expect(rewrite.maxParallelRuns, completion(equals(10)));
32+
});
33+
}

pkg/barback/test/transformer/mock.dart

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ abstract class MockTransformer extends Transformer {
2727
Future<int> get numRuns => schedule(() => _numRuns);
2828
var _numRuns = 0;
2929

30+
Future<int> get maxParallelRuns => schedule(() => _maxParallelRuns);
31+
var _maxParallelRuns = 0;
32+
3033
/// The number of currently running transforms.
3134
int _runningTransforms = 0;
3235

@@ -176,6 +179,9 @@ abstract class MockTransformer extends Transformer {
176179
_numRuns++;
177180
if (_runningTransforms == 0) _started.complete();
178181
_runningTransforms++;
182+
if (_runningTransforms > _maxParallelRuns) {
183+
_maxParallelRuns = _runningTransforms;
184+
}
179185
return newFuture(() => doApply(transform)).then((_) {
180186
if (_apply != null) return _apply.future;
181187
}).whenComplete(() {

0 commit comments

Comments
 (0)