Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/internal/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ export default function queue(worker, concurrency, payload) {
});

task.callback.apply(task, args);

if (args[0] != null) {
q.error(args[0], task.data);
}
});

if (workers <= (q.concurrency - q.buffer) ) {
Expand All @@ -85,6 +89,7 @@ export default function queue(worker, concurrency, payload) {
buffer: concurrency / 4,
empty: noop,
drain: noop,
error: noop,
started: false,
paused: false,
push: function (data, callback) {
Expand Down
2 changes: 2 additions & 0 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import queue from './internal/queue';
* from the `queue` is given to a `worker`.
* @property {Function} drain - a callback that is called when the last item
* from the `queue` has returned from the `worker`.
* @property {Function} error - a callback that is called when a task errors.
* Has the signature `function(error, task)`.
* @property {boolean} paused - a boolean for determining whether the queue is
* in a paused state.
* @property {Function} pause - a function that pauses the processing of tasks
Expand Down
27 changes: 27 additions & 0 deletions mocha_test/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,33 @@ describe('queue', function(){
});
});

it('global error handler', function(done){
var results = [];

var q = async.queue(function (task, callback) {
callback(task.name === 'foo' ? new Error('fooError') : null);
}, 2);

q.error = function(error, task) {
expect(error).to.exist;
expect(error.message).to.equal('fooError');
expect(task.name).to.equal('foo');
results.push('fooError');
};

q.drain = function() {
expect(results).to.eql(['fooError', 'bar']);
done();
};

q.push({name: 'foo'});

q.push({name: 'bar'}, function(error) {
expect(error).to.not.exist;
results.push('bar');
});
});

// The original queue implementation allowed the concurrency to be changed only
// on the same event loop during which a task was added to the queue. This
// test attempts to be a more robust test.
Expand Down