Skip to content

Commit 3e45972

Browse files
committed
Do not pause connection stream to detect closed connections immediately
1 parent 8ce5d7a commit 3e45972

File tree

7 files changed

+61
-51
lines changed

7 files changed

+61
-51
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,8 @@ successfully, i.e. it was read until its expected end.
396396
The `error` event will be emitted in case the request stream contains invalid
397397
chunked data or the connection closes before the complete request stream has
398398
been received.
399-
The server will automatically `pause()` the connection instead of closing it.
399+
The server will automatically stop reading from the connection and discard all
400+
incoming data instead of closing it.
400401
A response message can still be sent (unless the connection is already closed).
401402

402403
A `close` event will be emitted after an `error` or `end` event.

examples/08-stream-response.php

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,26 @@
1212

1313
// Note how this example still uses `Server` instead of `StreamingServer`.
1414
// The `StreamingServer` is only required for streaming *incoming* requests.
15-
$server = new Server($loop,function (ServerRequestInterface $request) use ($loop) {
15+
$server = new Server(function (ServerRequestInterface $request) use ($loop) {
1616
if ($request->getMethod() !== 'GET' || $request->getUri()->getPath() !== '/') {
1717
return new Response(404);
1818
}
1919

2020
$stream = new ThroughStream();
2121

22+
// send some data every once in a while with periodic timer
2223
$timer = $loop->addPeriodicTimer(0.5, function () use ($stream) {
23-
$stream->emit('data', array(microtime(true) . PHP_EOL));
24+
$stream->write(microtime(true) . PHP_EOL);
2425
});
2526

26-
$loop->addTimer(5, function() use ($loop, $timer, $stream) {
27+
// demo for ending stream after a few seconds
28+
$loop->addTimer(5.0, function() use ($stream) {
29+
$stream->end();
30+
});
31+
32+
// stop timer if stream is closed (such as when connection is closed)
33+
$stream->on('close', function () use ($loop, $timer) {
2734
$loop->cancelTimer($timer);
28-
$stream->emit('end');
2935
});
3036

3137
return new Response(

src/Io/CloseProtectionStream.php

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use React\Stream\WritableStreamInterface;
99

1010
/**
11-
* [Internal] Protects a given stream from actually closing and only pauses it instead.
11+
* [Internal] Protects a given stream from actually closing and only discards its incoming data instead.
1212
*
1313
* This is used internally to prevent the underlying connection from closing, so
1414
* that we can still send back a response over the same stream.
@@ -19,9 +19,10 @@ class CloseProtectionStream extends EventEmitter implements ReadableStreamInterf
1919
{
2020
private $input;
2121
private $closed = false;
22+
private $paused = false;
2223

2324
/**
24-
* @param ReadableStreamInterface $input stream that will be paused instead of closed on an 'close' event.
25+
* @param ReadableStreamInterface $input stream that will be discarded instead of closing it on an 'close' event.
2526
*/
2627
public function __construct(ReadableStreamInterface $input)
2728
{
@@ -44,6 +45,7 @@ public function pause()
4445
return;
4546
}
4647

48+
$this->paused = true;
4749
$this->input->pause();
4850
}
4951

@@ -53,6 +55,7 @@ public function resume()
5355
return;
5456
}
5557

58+
$this->paused = false;
5659
$this->input->resume();
5760
}
5861

@@ -71,16 +74,19 @@ public function close()
7174

7275
$this->closed = true;
7376

74-
$this->emit('close');
75-
76-
// 'pause' the stream avoids additional traffic transferred by this stream
77-
$this->input->pause();
78-
77+
// stop listening for incoming events
7978
$this->input->removeListener('data', array($this, 'handleData'));
8079
$this->input->removeListener('error', array($this, 'handleError'));
8180
$this->input->removeListener('end', array($this, 'handleEnd'));
8281
$this->input->removeListener('close', array($this, 'close'));
8382

83+
// resume the stream to ensure we discard everything from incoming connection
84+
if ($this->paused) {
85+
$this->paused = false;
86+
$this->input->resume();
87+
}
88+
89+
$this->emit('close');
8490
$this->removeAllListeners();
8591
}
8692

@@ -102,5 +108,4 @@ public function handleError(\Exception $e)
102108
{
103109
$this->emit('error', array($e));
104110
}
105-
106111
}

src/StreamingServer.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,9 +432,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
432432

433433
// Close response stream once connection closes.
434434
// Note that this TCP/IP close detection may take some time,
435-
// in particular this may only fire on a later read/write attempt
436-
// because we stop/pause reading from the connection once the
437-
// request has been processed.
435+
// in particular this may only fire on a later read/write attempt.
438436
$connection->on('close', array($body, 'close'));
439437

440438
$body->pipe($connection);

tests/FunctionalServerTest.php

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
use Psr\Http\Message\ServerRequestInterface;
66
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
77
use React\Http\Middleware\RequestBodyBufferMiddleware;
8+
use React\Http\Response;
9+
use React\Http\StreamingServer;
810
use React\Socket\Server as Socket;
911
use React\EventLoop\Factory;
10-
use React\Http\StreamingServer;
1112
use Psr\Http\Message\RequestInterface;
1213
use React\Socket\Connector;
1314
use React\Socket\ConnectionInterface;
1415
use Clue\React\Block;
15-
use React\Http\Response;
1616
use React\Socket\SecureServer;
1717
use React\Promise;
1818
use React\Promise\Stream;
@@ -498,7 +498,7 @@ public function testRequestHandlerWillReceiveCloseEventIfConnectionClosesWhileSe
498498
$socket->close();
499499
}
500500

501-
public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesWhileSendingBody()
501+
public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesWhileSendingRequestBody()
502502
{
503503
$loop = Factory::create();
504504
$connector = new Connector($loop);
@@ -528,13 +528,12 @@ public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesWhileS
528528
$this->assertNull($ret);
529529
}
530530

531-
public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesButWillOnlyBeDetectedOnNextWrite()
531+
public function testStreamFromRequestHandlerWillBeClosedIfConnectionCloses()
532532
{
533533
$loop = Factory::create();
534534
$connector = new Connector($loop);
535535

536536
$stream = new ThroughStream();
537-
$stream->on('close', $this->expectCallableOnce());
538537

539538
$server = new StreamingServer(function (RequestInterface $request) use ($stream) {
540539
return new Response(200, array(), $stream);
@@ -543,27 +542,20 @@ public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesButWil
543542
$socket = new Socket(0, $loop);
544543
$server->listen($socket);
545544

546-
$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) use ($loop) {
545+
$connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) use ($loop) {
547546
$conn->write("GET / HTTP/1.0\r\n\r\n");
548547

549-
$loop->addTimer(0.1, function() use ($conn) {
550-
$conn->end();
548+
$loop->addTimer(0.1, function () use ($conn) {
549+
$conn->close();
551550
});
552-
553-
return Stream\buffer($conn);
554551
});
555552

556-
$response = Block\await($result, $loop, 1.0);
557-
558-
$stream->write('nope');
559-
Block\sleep(0.1, $loop);
560-
$stream->write('nope');
561-
Block\sleep(0.1, $loop);
562-
563-
$this->assertStringStartsWith("HTTP/1.0 200 OK", $response);
564-
$this->assertStringEndsWith("\r\n\r\n", $response);
553+
// await response stream to be closed
554+
$ret = Block\await(Stream\first($stream, 'close'), $loop, 1.0);
565555

566556
$socket->close();
557+
558+
$this->assertNull($ret);
567559
}
568560

569561
public function testUpgradeWithThroughStreamReturnsDataAsGiven()

tests/Io/CloseProtectionStreamTest.php

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88

99
class CloseProtectionStreamTest extends TestCase
1010
{
11-
public function testClosePausesTheInputStreamInsteadOfClosing()
11+
public function testCloseDoesNotCloseTheInputStream()
1212
{
1313
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->disableOriginalConstructor()->getMock();
14-
$input->expects($this->once())->method('pause');
14+
$input->expects($this->never())->method('pause');
15+
$input->expects($this->never())->method('resume');
1516
$input->expects($this->never())->method('close');
1617

1718
$protection = new CloseProtectionStream($input);
@@ -43,6 +44,17 @@ public function testResumeStreamWillResumeInputStream()
4344
$protection->resume();
4445
}
4546

47+
public function testCloseResumesInputStreamIfItWasPreviouslyPaused()
48+
{
49+
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
50+
$input->expects($this->once())->method('pause');
51+
$input->expects($this->once())->method('resume');
52+
53+
$protection = new CloseProtectionStream($input);
54+
$protection->pause();
55+
$protection->close();
56+
}
57+
4658
public function testInputStreamIsNotReadableAfterClose()
4759
{
4860
$input = new ThroughStream();
@@ -121,7 +133,8 @@ public function testEndWontBeEmittedAfterClose()
121133
public function testPauseAfterCloseHasNoEffect()
122134
{
123135
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
124-
$input->expects($this->once())->method('pause');
136+
$input->expects($this->never())->method('pause');
137+
$input->expects($this->never())->method('resume');
125138

126139
$protection = new CloseProtectionStream($input);
127140
$protection->on('data', $this->expectCallableNever());
@@ -134,7 +147,7 @@ public function testPauseAfterCloseHasNoEffect()
134147
public function testResumeAfterCloseHasNoEffect()
135148
{
136149
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
137-
$input->expects($this->once())->method('pause');
150+
$input->expects($this->never())->method('pause');
138151
$input->expects($this->never())->method('resume');
139152

140153
$protection = new CloseProtectionStream($input);

tests/StreamingServerTest.php

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ public function testRequestOptionsAbsoluteEvent()
497497
$this->assertSame('example.com', $requestAssertion->getHeaderLine('Host'));
498498
}
499499

500-
public function testRequestPauseWillbeForwardedToConnection()
500+
public function testRequestPauseWillBeForwardedToConnection()
501501
{
502502
$server = new StreamingServer(function (ServerRequestInterface $request) {
503503
$request->getBody()->pause();
@@ -517,7 +517,7 @@ public function testRequestPauseWillbeForwardedToConnection()
517517
$this->connection->emit('data', array($data));
518518
}
519519

520-
public function testRequestResumeWillbeForwardedToConnection()
520+
public function testRequestResumeWillBeForwardedToConnection()
521521
{
522522
$server = new StreamingServer(function (ServerRequestInterface $request) {
523523
$request->getBody()->resume();
@@ -532,13 +532,13 @@ public function testRequestResumeWillbeForwardedToConnection()
532532
$this->connection->emit('data', array($data));
533533
}
534534

535-
public function testRequestCloseWillPauseConnection()
535+
public function testRequestCloseWillNotCloseConnection()
536536
{
537537
$server = new StreamingServer(function (ServerRequestInterface $request) {
538538
$request->getBody()->close();
539539
});
540540

541-
$this->connection->expects($this->once())->method('pause');
541+
$this->connection->expects($this->never())->method('close');
542542

543543
$server->listen($this->socket);
544544
$this->socket->emit('connection', array($this->connection));
@@ -554,7 +554,8 @@ public function testRequestPauseAfterCloseWillNotBeForwarded()
554554
$request->getBody()->pause();
555555
});
556556

557-
$this->connection->expects($this->once())->method('pause');
557+
$this->connection->expects($this->never())->method('close');
558+
$this->connection->expects($this->never())->method('pause');
558559

559560
$server->listen($this->socket);
560561
$this->socket->emit('connection', array($this->connection));
@@ -570,7 +571,7 @@ public function testRequestResumeAfterCloseWillNotBeForwarded()
570571
$request->getBody()->resume();
571572
});
572573

573-
$this->connection->expects($this->once())->method('pause');
574+
$this->connection->expects($this->never())->method('close');
574575
$this->connection->expects($this->never())->method('resume');
575576

576577
$server->listen($this->socket);
@@ -1964,7 +1965,6 @@ public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream
19641965
});
19651966

19661967
$this->connection->expects($this->never())->method('close');
1967-
$this->connection->expects($this->once())->method('pause');
19681968

19691969
$server->listen($this->socket);
19701970
$this->socket->emit('connection', array($this->connection));
@@ -1989,7 +1989,6 @@ public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream()
19891989
});
19901990

19911991
$this->connection->expects($this->never())->method('close');
1992-
$this->connection->expects($this->once())->method('pause');
19931992

19941993
$server->listen($this->socket);
19951994
$this->socket->emit('connection', array($this->connection));
@@ -2012,7 +2011,6 @@ public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWi
20122011
});
20132012

20142013
$this->connection->expects($this->never())->method('close');
2015-
$this->connection->expects($this->once())->method('pause');
20162014

20172015
$server->listen($this->socket);
20182016
$this->socket->emit('connection', array($this->connection));
@@ -2036,7 +2034,6 @@ public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream()
20362034
});
20372035

20382036
$this->connection->expects($this->never())->method('close');
2039-
$this->connection->expects($this->once())->method('pause');
20402037

20412038
$server->listen($this->socket);
20422039
$this->socket->emit('connection', array($this->connection));
@@ -2059,7 +2056,6 @@ public function testRequestUnexpectedEndOfRequestWithContentLengthWillEmitErrorO
20592056
});
20602057

20612058
$this->connection->expects($this->never())->method('close');
2062-
$this->connection->expects($this->once())->method('pause');
20632059

20642060
$server->listen($this->socket);
20652061
$this->socket->emit('connection', array($this->connection));
@@ -2089,7 +2085,6 @@ public function testRequestWithoutBodyWillEmitEndOnRequestStream()
20892085
$request->getBody()->on('error', $errorEvent);
20902086
});
20912087

2092-
$this->connection->expects($this->once())->method('pause');
20932088
$this->connection->expects($this->never())->method('close');
20942089

20952090
$server->listen($this->socket);

0 commit comments

Comments
 (0)