Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ $promise->then(function ($value) {
Useful functions for creating, joining, mapping and reducing collections of
promises.

All functions working on promise collections (like `all()`, `race()`, `some()`
etc.) support cancellation. This means, if you call `cancel()` on the returned
promise, all promises in the collection are cancelled. If the collection itself
is a promise which resolves to an array, this promise is also cancelled.

#### resolve()

```php
Expand Down
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
},
"files": ["src/functions_include.php"]
},
"autoload-dev": {
"psr-4": {
"React\\Promise\\": "tests/fixtures"
}
},
"extra": {
"branch-alias": {
"dev-master": "2.0-dev"
Expand Down
58 changes: 58 additions & 0 deletions src/CancellationQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

namespace React\Promise;

class CancellationQueue
{
private $started = false;

/**
* @var CancellablePromiseInterface[]
*/
private $queue = [];

public function __invoke()
{
if ($this->started) {
return;
}

$this->started = true;
$this->drain();
}

public function enqueue($promise)
{
if (!$promise instanceof CancellablePromiseInterface) {
return;
}

$length = array_push($this->queue, $promise);

if ($this->started && 1 === $length) {
$this->drain();
}
}

private function drain()
{
for ($i = key($this->queue); isset($this->queue[$i]); $i++) {
$promise = $this->queue[$i];

$exception = null;

try {
$promise->cancel();
} catch (\Exception $exception) {
}

unset($this->queue[$i]);

if ($exception) {
throw $exception;
}
}

$this->queue = [];
}
}
137 changes: 88 additions & 49 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,35 @@ function all($promisesOrValues)

function race($promisesOrValues)
{
return resolve($promisesOrValues)
->then(function ($array) {
if (!is_array($array) || !$array) {
return resolve();
}
$cancellationQueue = new CancellationQueue();
$cancellationQueue->enqueue($promisesOrValues);

return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $cancellationQueue) {
resolve($promisesOrValues)
->done(function ($array) use ($cancellationQueue, $resolve, $reject, $notify) {
if (!is_array($array) || !$array) {
$resolve();
return;
}

return new Promise(function ($resolve, $reject, $notify) use ($array) {
foreach ($array as $promiseOrValue) {
$cancellationQueue->enqueue($promiseOrValue);

$fulfiller = function ($value) use ($cancellationQueue, $resolve) {
$cancellationQueue();
$resolve($value);
};

$rejecter = function ($reason) use ($cancellationQueue, $reject) {
$cancellationQueue();
$reject($reason);
};

resolve($promiseOrValue)
->done($resolve, $reject, $notify);
->done($fulfiller, $rejecter, $notify);
}
});
});
}, $reject, $notify);
}, $cancellationQueue);
}

function any($promisesOrValues)
Expand All @@ -62,28 +78,33 @@ function any($promisesOrValues)

function some($promisesOrValues, $howMany)
{
return resolve($promisesOrValues)
->then(function ($array) use ($howMany) {
if (!is_array($array) || !$array || $howMany < 1) {
return resolve([]);
}
$cancellationQueue = new CancellationQueue();
$cancellationQueue->enqueue($promisesOrValues);

return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $howMany, $cancellationQueue) {
resolve($promisesOrValues)
->done(function ($array) use ($howMany, $cancellationQueue, $resolve, $reject, $notify) {
if (!is_array($array) || !$array || $howMany < 1) {
$resolve([]);
return;
}

return new Promise(function ($resolve, $reject, $notify) use ($array, $howMany) {
$len = count($array);
$toResolve = min($howMany, $len);
$toReject = ($len - $toResolve) + 1;
$values = [];
$reasons = [];

foreach ($array as $i => $promiseOrValue) {
$fulfiller = function ($val) use ($i, &$values, &$toResolve, $toReject, $resolve) {
$fulfiller = function ($val) use ($i, &$values, &$toResolve, $toReject, $resolve, $cancellationQueue) {
if ($toResolve < 1 || $toReject < 1) {
return;
}

$values[$i] = $val;

if (0 === --$toResolve) {
$cancellationQueue();
$resolve($values);
}
};
Expand All @@ -100,26 +121,34 @@ function some($promisesOrValues, $howMany)
}
};

$cancellationQueue->enqueue($promiseOrValue);

resolve($promiseOrValue)
->done($fulfiller, $rejecter, $notify);
}
});
});
}, $reject, $notify);
}, $cancellationQueue);
}

function map($promisesOrValues, callable $mapFunc)
{
return resolve($promisesOrValues)
->then(function ($array) use ($mapFunc) {
if (!is_array($array) || !$array) {
return resolve([]);
}
$cancellationQueue = new CancellationQueue();
$cancellationQueue->enqueue($promisesOrValues);

return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $mapFunc, $cancellationQueue) {
resolve($promisesOrValues)
->done(function ($array) use ($mapFunc, $cancellationQueue, $resolve, $reject, $notify) {
if (!is_array($array) || !$array) {
$resolve([]);
return;
}

return new Promise(function ($resolve, $reject, $notify) use ($array, $mapFunc) {
$toResolve = count($array);
$values = [];

foreach ($array as $i => $promiseOrValue) {
$cancellationQueue->enqueue($promiseOrValue);

resolve($promiseOrValue)
->then($mapFunc)
->done(
Expand All @@ -134,35 +163,45 @@ function ($mapped) use ($i, &$values, &$toResolve, $resolve) {
$notify
);
}
});
});
}, $reject, $notify);
}, $cancellationQueue);
}

function reduce($promisesOrValues, callable $reduceFunc, $initialValue = null)
{
return resolve($promisesOrValues)
->then(function ($array) use ($reduceFunc, $initialValue) {
if (!is_array($array)) {
$array = [];
}

$total = count($array);
$i = 0;

// Wrap the supplied $reduceFunc with one that handles promises and then
// delegates to the supplied.
$wrappedReduceFunc = function ($current, $val) use ($reduceFunc, $total, &$i) {
return resolve($current)
->then(function ($c) use ($reduceFunc, $total, &$i, $val) {
return resolve($val)
->then(function ($value) use ($reduceFunc, $total, &$i, $c) {
return $reduceFunc($c, $value, $i++, $total);
});
});
};

return array_reduce($array, $wrappedReduceFunc, $initialValue);
});
$cancellationQueue = new CancellationQueue();
$cancellationQueue->enqueue($promisesOrValues);

return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $reduceFunc, $initialValue, $cancellationQueue) {
resolve($promisesOrValues)
->done(function ($array) use ($reduceFunc, $initialValue, $cancellationQueue, $resolve, $reject, $notify) {
if (!is_array($array)) {
$array = [];
}

$total = count($array);
$i = 0;

// Wrap the supplied $reduceFunc with one that handles promises and then
// delegates to the supplied.
$wrappedReduceFunc = function ($current, $val) use ($reduceFunc, $cancellationQueue, $total, &$i) {
$cancellationQueue->enqueue($val);

return $current
->then(function ($c) use ($reduceFunc, $total, &$i, $val) {
return resolve($val)
->then(function ($value) use ($reduceFunc, $total, &$i, $c) {
return $reduceFunc($c, $value, $i++, $total);
});
});
};

$cancellationQueue->enqueue($initialValue);

array_reduce($array, $wrappedReduceFunc, resolve($initialValue))
->done($resolve, $reject, $notify);
}, $reject, $notify);
}, $cancellationQueue);
}

// Internal functions
Expand Down
85 changes: 85 additions & 0 deletions tests/CancellationQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

namespace React\Promise;

class CancellationQueueTest extends TestCase
{
/** @test */
public function ignoresNonCancellablePromises()
{
$p = new SimpleFulfilledTestPromise();

$cancellationQueue = new CancellationQueue();
$cancellationQueue->enqueue($p);

$cancellationQueue();

$this->assertFalse($p->cancelCalled);
}

/** @test */
public function callsCancelOnPromisesEnqueuedBeforeStart()
{
$d1 = $this->getCancellableDeferred();
$d2 = $this->getCancellableDeferred();

$cancellationQueue = new CancellationQueue();
$cancellationQueue->enqueue($d1->promise());
$cancellationQueue->enqueue($d2->promise());

$cancellationQueue();
}

/** @test */
public function callsCancelOnPromisesEnqueuedAfterStart()
{
$d1 = $this->getCancellableDeferred();
$d2 = $this->getCancellableDeferred();

$cancellationQueue = new CancellationQueue();

$cancellationQueue();

$cancellationQueue->enqueue($d2->promise());
$cancellationQueue->enqueue($d1->promise());
}

/** @test */
public function doesNotCallCancelTwiceWhenStartedTwice()
{
$d = $this->getCancellableDeferred();

$cancellationQueue = new CancellationQueue();
$cancellationQueue->enqueue($d->promise());

$cancellationQueue();
$cancellationQueue();
}

/** @test */
public function rethrowsExceptionsThrownFromCancel()
{
$this->setExpectedException('\Exception', 'test');

$mock = $this->getMock('React\Promise\CancellablePromiseInterface');
$mock
->expects($this->once())
->method('cancel')
->will($this->throwException(new \Exception('test')));

$cancellationQueue = new CancellationQueue();
$cancellationQueue->enqueue($mock);

$cancellationQueue();
}

private function getCancellableDeferred()
{
$mock = $this->createCallableMock();
$mock
->expects($this->once())
->method('__invoke');

return new Deferred($mock);
}
}
Loading