From 6aa0fe383e060a7a1220611ce0daf360ddae2c1c Mon Sep 17 00:00:00 2001 From: rahul Date: Sun, 10 Sep 2023 15:13:06 +0530 Subject: [PATCH] concurrent stream processing support Signed-off-by: rahul Signed-off-by: rahul --- src/Stream.php | 49 ----------- src/StreamHandler.php | 104 ++++++++++++++++++++++++ tests/Integration/StreamHandlerTest.php | 81 ++++++++++++++++++ tests/Integration/StreamTest.php | 62 -------------- 4 files changed, 185 insertions(+), 111 deletions(-) delete mode 100644 src/Stream.php create mode 100644 src/StreamHandler.php create mode 100644 tests/Integration/StreamHandlerTest.php delete mode 100644 tests/Integration/StreamTest.php diff --git a/src/Stream.php b/src/Stream.php deleted file mode 100644 index e7be8fd..0000000 --- a/src/Stream.php +++ /dev/null @@ -1,49 +0,0 @@ -streamUrl, 'r'); - if (!$stream) { - throw new StreamException(); - } - stream_set_blocking($stream, false); - - $outputFile = fopen($this->outputFilename, 'w'); - - try { - $content = $fiber->start($stream); - while (!$fiber->isTerminated()) { - fwrite($outputFile, $content); - - $content = $fiber->resume(); - } - } catch (Throwable $e) { - throw new StreamException(); - } finally { - fclose($stream); - fclose($outputFile); - } - } -} diff --git a/src/StreamHandler.php b/src/StreamHandler.php new file mode 100644 index 0000000..bf3c94b --- /dev/null +++ b/src/StreamHandler.php @@ -0,0 +1,104 @@ +streamUrls) { + throw new StreamException('No stream URLs provided.'); + } + } + + /** + */ + private function stream(string $streamUrl, string $outputFilename): Fiber + { + return new Fiber(function () use ($streamUrl, $outputFilename) { + $stream = fopen($streamUrl, 'r'); + if (!$stream) { + throw new StreamException("Failed to open stream: $streamUrl"); + } + stream_set_blocking($stream, false); + + $outputFile = fopen($outputFilename, 'w'); + + try { + while (!feof($stream)) { + $contents = fread($stream, $this->chunk); + fwrite($outputFile, $contents); + Fiber::suspend(); + } + } catch (Throwable $e) { + throw new StreamException(); + } finally { + fclose($stream); + fclose($outputFile); + } + }); + } + + /** + */ + public function initiateConcurrentStreams(): self + { + foreach ($this->streamUrls as $outputFile => $streamUrl) { + $fiber = $this->stream($streamUrl, $outputFile); + + $this->fibers[] = $fiber; + } + + return $this; + } + + /** + * @throws StreamException + * @throws Throwable + */ + public function start(): self + { + if (!$this->fibers) { + throw new StreamException("No fibers available to start"); + } + + /** @var Fiber $fiber */ + foreach ($this->fibers as $fiber) { + $fiber->start(); + } + + return $this; + } + + /** + * @throws Throwable + */ + public function resume(bool $resumeOnce = false): void + { + if (!$this->fibers) { + throw new StreamException("No fibers are currently running"); + } + + /** @var Fiber $fiber */ + foreach ($this->fibers as $fiber) { + while (!$fiber->isTerminated()) { + $fiber->resume(); + if ($resumeOnce) { + break; + } + } + } + + $this->fibers = []; + } +} diff --git a/tests/Integration/StreamHandlerTest.php b/tests/Integration/StreamHandlerTest.php new file mode 100644 index 0000000..f19eb72 --- /dev/null +++ b/tests/Integration/StreamHandlerTest.php @@ -0,0 +1,81 @@ +initiateConcurrentStreams()->start()->resume(); + + foreach ($urls as $file => $url) { + $this->assertGreaterThan(0, filesize($file)); + $this->assertStringContainsString('', file_get_contents($file)); + $this->assertStringContainsString('', file_get_contents($file)); + } + } + + #[Test] + #[DataProvider('wrongStreamDataProvider')] + public function throwExceptionIfUrlIsInvalid($outputFile, $url) + { + $stream = new StreamHandler([$outputFile => $url]); + + + $this->expectException(StreamException::class); + $stream->initiateConcurrentStreams()->start()->resume(); + } + + #[Test] + public function throwExceptionIfEmptyDataProvided() + { + $this->expectException(StreamException::class); + $this->expectExceptionMessage('No stream URLs provided.'); + new StreamHandler([]); + } + + public static function wrongStreamDataProvider(): iterable + { + yield ["output.html", "https://gist.github"]; + } + + + public static function streamDataProvider(): iterable + { + yield [ + [ + "output.html" => + "https://gist.github.com/rcsofttech85/629b37d483c4796db7bdcb3704067631#file-gistfile1-txt", + + "output1.html" => "https://gist.github.com/rcsofttech85/f71f2454b1fc40a077cda14ef3097385#file-gistfile1-txt", + + + "output2.html" => "https://gist.github.com/rcsofttech85/79ab19f1502e72c95cfa97d5205fa47d#file-gistfile1-txt" + ] + ]; + } +} diff --git a/tests/Integration/StreamTest.php b/tests/Integration/StreamTest.php deleted file mode 100644 index 9bf9e8b..0000000 --- a/tests/Integration/StreamTest.php +++ /dev/null @@ -1,62 +0,0 @@ -startStreaming(); - - $this->assertGreaterThan(0, filesize($output)); - $this->assertStringContainsString('', file_get_contents($output)); - $this->assertStringContainsString('', file_get_contents($output)); - } - - #[Test] - #[DataProvider('wrongStreamDataProvider')] - public function throwExceptionIfUrlIsInvalid($output, $url) - { - $stream = new Stream($url, $output); - - $this->expectException(StreamException::class); - $stream->startStreaming(); - } - - public static function streamDataProvider(): iterable - { - yield [ - "output.html", - "https://gist.github.com/rcsofttech85/629b37d483c4796db7bdcb3704067631#file-gistfile1-txt" - ]; - } - - public static function wrongStreamDataProvider(): iterable - { - yield ["output.html", "https://gist.github"]; - } -}