diff --git a/README.md b/README.md index f56c660..75662e7 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ Interesting events emitted by Request: Interesting events emitted by Response: * `data`: Passes a chunk of the response body as first argument and a Response - object itself as second argument. + object itself as second argument. When a response encounters a chunked encoded response it will parse it transparently for the user of `Response` and removing the `Transfer-Encoding` header. * `error`: An error occurred. * `end`: The response has been fully received. If an error occurred, it is passed as first argument. @@ -55,6 +55,5 @@ $loop->run(); ## TODO * gzip content encoding -* chunked transfer encoding * keep-alive connections * following redirections diff --git a/src/ChunkedStreamDecoder.php b/src/ChunkedStreamDecoder.php new file mode 100644 index 0000000..d5cd8a9 --- /dev/null +++ b/src/ChunkedStreamDecoder.php @@ -0,0 +1,204 @@ +stream = $stream; + $this->stream->on('data', array($this, 'handleData')); + $this->stream->on('end', array($this, 'handleEnd')); + Util::forwardEvents($this->stream, $this, [ + 'error', + ]); + } + + /** @internal */ + public function handleData($data) + { + $this->buffer .= $data; + + do { + $bufferLength = strlen($this->buffer); + $continue = $this->iterateBuffer(); + $iteratedBufferLength = strlen($this->buffer); + } while ( + $continue && + $bufferLength !== $iteratedBufferLength && + $iteratedBufferLength > 0 + ); + + if ($this->buffer === false) { + $this->buffer = ''; + } + } + + protected function iterateBuffer() + { + if (strlen($this->buffer) <= 1) { + return false; + } + + if ($this->nextChunkIsLength) { + $crlfPosition = strpos($this->buffer, static::CRLF); + if ($crlfPosition === false && strlen($this->buffer) > 1024) { + $this->emit('error', [ + new Exception('Chunk length header longer then 1024 bytes'), + ]); + $this->close(); + return false; + } + if ($crlfPosition === false) { + return false; // Chunk header hasn't completely come in yet + } + $this->nextChunkIsLength = false; + $lengthChunk = substr($this->buffer, 0, $crlfPosition); + if (strpos($lengthChunk, ';') !== false) { + list($lengthChunk) = explode(';', $lengthChunk, 2); + } + if (dechex(hexdec($lengthChunk)) !== $lengthChunk) { + $this->emit('error', [ + new Exception('Unable to validate "' . $lengthChunk . '" as chunk length header'), + ]); + $this->close(); + return false; + } + $this->remainingLength = hexdec($lengthChunk); + $this->buffer = substr($this->buffer, $crlfPosition + 2); + return true; + } + + if ($this->remainingLength > 0) { + $chunkLength = $this->getChunkLength(); + if ($chunkLength === 0) { + return true; + } + $this->emit('data', array( + substr($this->buffer, 0, $chunkLength), + $this + )); + $this->remainingLength -= $chunkLength; + $this->buffer = substr($this->buffer, $chunkLength); + return true; + } + + $this->nextChunkIsLength = true; + $this->buffer = substr($this->buffer, 2); + + if (substr($this->buffer, 0, 3) === "0\r\n") { + $this->reachedEnd = true; + $this->emit('end'); + $this->close(); + return false; + } + return true; + } + + protected function getChunkLength() + { + $bufferLength = strlen($this->buffer); + + if ($bufferLength >= $this->remainingLength) { + return $this->remainingLength; + } + + return $bufferLength; + } + + public function pause() + { + $this->stream->pause(); + } + + public function resume() + { + $this->stream->resume(); + } + + public function isReadable() + { + return $this->stream->isReadable(); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + public function close() + { + $this->closed = true; + return $this->stream->close(); + } + + /** @internal */ + public function handleEnd() + { + if ($this->closed) { + return; + } + + if ($this->buffer === '' && $this->reachedEnd) { + $this->emit('end'); + $this->close(); + return; + } + + $this->emit( + 'error', + [ + new Exception('Stream ended with incomplete control code') + ] + ); + $this->close(); + } +} diff --git a/src/Response.php b/src/Response.php index 880d412..68cb3a6 100644 --- a/src/Response.php +++ b/src/Response.php @@ -33,10 +33,22 @@ public function __construct(DuplexStreamInterface $stream, $protocol, $version, $this->code = $code; $this->reasonPhrase = $reasonPhrase; $this->headers = $headers; + $normalizedHeaders = array_change_key_case($headers, CASE_LOWER); - $stream->on('data', array($this, 'handleData')); - $stream->on('error', array($this, 'handleError')); - $stream->on('end', array($this, 'handleEnd')); + if (isset($normalizedHeaders['transfer-encoding']) && strtolower($normalizedHeaders['transfer-encoding']) === 'chunked') { + $this->stream = new ChunkedStreamDecoder($stream); + + foreach ($this->headers as $key => $value) { + if (strcasecmp('transfer-encoding', $key) === 0) { + unset($this->headers[$key]); + break; + } + } + } + + $this->stream->on('data', array($this, 'handleData')); + $this->stream->on('error', array($this, 'handleError')); + $this->stream->on('end', array($this, 'handleEnd')); } public function getProtocol() diff --git a/tests/DecodeChunkedStreamTest.php b/tests/DecodeChunkedStreamTest.php new file mode 100644 index 0000000..e227542 --- /dev/null +++ b/tests/DecodeChunkedStreamTest.php @@ -0,0 +1,143 @@ + [ + ["4\r\nWiki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"], + ], + 'data-set-2' => [ + ["4\r\nWiki\r\n", "5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"], + ], + 'data-set-3' => [ + ["4\r\nWiki\r\n", "5\r\n", "pedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"], + ], + 'data-set-4' => [ + ["4\r\nWiki\r\n", "5\r\n", "pedia\r\ne\r\n in\r\n", "\r\nchunks.\r\n0\r\n\r\n"], + ], + 'data-set-5' => [ + ["4\r\n", "Wiki\r\n", "5\r\n", "pedia\r\ne\r\n in\r\n", "\r\nchunks.\r\n0\r\n\r\n"], + ], + 'data-set-6' => [ + ["4\r\n", "Wiki\r\n", "5\r\n", "pedia\r\ne; foo=[bar,beer,pool,cue,win,won]\r\n", " in\r\n", "\r\nchunks.\r\n0\r\n\r\n"], + ], + 'header-fields' => [ + ["4; foo=bar\r\n", "Wiki\r\n", "5\r\n", "pedia\r\ne\r\n", " in\r\n", "\r\nchunks.\r\n", "0\r\n\r\n"], + ], + 'character-for-charactrr' => [ + str_split("4\r\nWiki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"), + ], + 'extra-newline-in-wiki-character-for-chatacter' => [ + str_split("6\r\nWi\r\nki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"), + "Wi\r\nkipedia in\r\n\r\nchunks." + ], + 'extra-newline-in-wiki' => [ + ["6\r\nWi\r\n", "ki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"], + "Wi\r\nkipedia in\r\n\r\nchunks." + ], + ]; + } + + /** + * @test + * @dataProvider provideChunkedEncoding + */ + public function testChunkedEncoding(array $strings, $expected = "Wikipedia in\r\n\r\nchunks.") + { + $stream = new ThroughStream(); + $response = new ChunkedStreamDecoder($stream); + $buffer = ''; + $response->on('data', function ($data) use (&$buffer) { + $buffer .= $data; + }); + $response->on('error', function (Exception $exception) { + throw $exception; + }); + foreach ($strings as $string) { + $stream->write($string); + } + $this->assertSame($expected, $buffer); + } + + public function provideInvalidChunkedEncoding() + { + return [ + 'chunk-body-longer-than-header-suggests' => [ + ["4\r\nWiwot40n98w3498tw3049nyn039409t34\r\n", "ki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"], + ], + 'invalid-header-charactrrs' => [ + str_split("xyz\r\nWi\r\nki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n") + ], + 'header-chunk-to-long' => [ + str_split(str_repeat('a', 2015) . "\r\nWi\r\nki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n") + ], + ]; + } + + /** + * @test + * @dataProvider provideInvalidChunkedEncoding + * @expectedException Exception + */ + public function testInvalidChunkedEncoding(array $strings) + { + $stream = new ThroughStream(); + $response = new ChunkedStreamDecoder($stream); + $response->on('error', function (Exception $exception) { + throw $exception; + }); + foreach ($strings as $string) { + $stream->write($string); + } + } + + public function testHandleEnd() + { + $ended = false; + $stream = new ThroughStream(); + $response = new ChunkedStreamDecoder($stream); + $response->on('end', function () use (&$ended) { + $ended = true; + }); + + $stream->write("4\r\nWiki\r\n0\r\n\r\n"); + + $this->assertTrue($ended); + } + + public function testHandleEndIncomplete() + { + $exception = null; + $stream = new ThroughStream(); + $response = new ChunkedStreamDecoder($stream); + $response->on('error', function ($e) use (&$exception) { + $exception = $e; + }); + + $stream->end("4\r\nWiki"); + + $this->assertInstanceOf('Exception', $exception); + } + + public function testHandleEndTrailers() + { + $ended = false; + $stream = new ThroughStream(); + $response = new ChunkedStreamDecoder($stream); + $response->on('end', function () use (&$ended) { + $ended = true; + }); + + $stream->write("4\r\nWiki\r\n0\r\nabc: def\r\nghi: klm\r\n\r\n"); + + $this->assertTrue($ended); + } +} diff --git a/tests/ResponseTest.php b/tests/ResponseTest.php index 5b86e19..9ed4009 100644 --- a/tests/ResponseTest.php +++ b/tests/ResponseTest.php @@ -3,6 +3,7 @@ namespace React\Tests\HttpClient; use React\HttpClient\Response; +use React\Stream\ThroughStream; class ResponseTest extends TestCase { @@ -54,6 +55,13 @@ public function responseShouldEmitEndEventOnEnd() $response->handleData('some data'); $response->handleEnd(); + + $this->assertSame( + [ + 'Content-Type' => 'text/plain' + ], + $response->getHeaders() + ); } /** @test */ @@ -72,6 +80,47 @@ public function closedResponseShouldNotBeResumedOrPaused() $response->resume(); $response->pause(); + + $this->assertSame( + [ + 'content-type' => 'text/plain', + ], + $response->getHeaders() + ); + } + + /** @test */ + public function chunkedEncodingResponse() + { + $stream = new ThroughStream(); + $response = new Response( + $stream, + 'http', + '1.0', + '200', + 'ok', + [ + 'content-type' => 'text/plain', + 'transfer-encoding' => 'chunked', + ] + ); + + $buffer = ''; + $response->on('data', function ($data, $stream) use (&$buffer) { + $buffer.= $data; + }); + $this->assertSame('', $buffer); + $stream->write("4; abc=def\r\n"); + $this->assertSame('', $buffer); + $stream->write("Wiki\r\n"); + $this->assertSame('Wiki', $buffer); + + $this->assertSame( + [ + 'content-type' => 'text/plain', + ], + $response->getHeaders() + ); } }