Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions src/ReadableStream.php

This file was deleted.

1 change: 1 addition & 0 deletions src/ThroughStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,6 @@ public function close()
$this->callback = null;

$this->emit('close');
$this->removeAllListeners();
}
}
40 changes: 0 additions & 40 deletions src/WritableStream.php

This file was deleted.

28 changes: 12 additions & 16 deletions tests/CompositeStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
namespace React\Tests\Stream;

use React\Stream\CompositeStream;
use React\Stream\ReadableStream;
use React\Stream\WritableStream;
use React\Stream\ThroughStream;

/**
* @covers React\Stream\CompositeStream
Expand Down Expand Up @@ -105,8 +104,8 @@ public function closeShouldCloseBothStreams()
/** @test */
public function itShouldForwardCloseOnlyOnce()
{
$readable = new ReadableStream();
$writable = new WritableStream();
$readable = new ThroughStream();
$writable = new ThroughStream();

$composite = new CompositeStream($readable, $writable);
$composite->on('close', $this->expectCallableOnce());
Expand All @@ -118,8 +117,8 @@ public function itShouldForwardCloseOnlyOnce()
/** @test */
public function itShouldReceiveForwardedEvents()
{
$readable = new ReadableStream();
$writable = new WritableStream();
$readable = new ThroughStream();
$writable = new ThroughStream();

$composite = new CompositeStream($readable, $writable);
$composite->on('data', $this->expectCallableOnce());
Expand All @@ -142,7 +141,7 @@ public function itShouldHandlePipingCorrectly()

$composite = new CompositeStream($readable, $writable);

$input = new ReadableStream();
$input = new ThroughStream();
$input->pipe($composite);
$input->emit('data', array('foo'));
}
Expand All @@ -157,7 +156,7 @@ public function itShouldForwardPauseUpstreamWhenPipedTo()

$composite = new CompositeStream($readable, $writable);

$input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause', 'resume'))->getMock();
$input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock();
$input
->expects($this->once())
->method('pause');
Expand All @@ -176,7 +175,7 @@ public function itShouldForwardResumeUpstreamWhenPipedTo()

$composite = new CompositeStream($readable, $writable);

$input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause', 'resume'))->getMock();
$input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock();
$input
->expects($this->once())
->method('resume');
Expand All @@ -189,15 +188,12 @@ public function itShouldForwardResumeUpstreamWhenPipedTo()
public function itShouldForwardPauseAndResumeUpstreamWhenPipedTo()
{
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$writable = $this->getMockBuilder('React\Stream\WritableStream')->setMethods(array('write'))->getMock();
$writable
->expects($this->once())
->method('write')
->will($this->returnValue(false));
$writable = new ThroughStream();
$writable->pause();

$composite = new CompositeStream($readable, $writable);

$input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause', 'resume'))->getMock();
$input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock();
$input
->expects($this->once())
->method('pause');
Expand All @@ -213,7 +209,7 @@ public function itShouldForwardPauseAndResumeUpstreamWhenPipedTo()
/** @test */
public function itShouldForwardPipeCallsToReadableStream()
{
$readable = new ReadableStream();
$readable = new ThroughStream();
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
$writable->expects($this->any())->method('isWritable')->willReturn(True);
$composite = new CompositeStream($readable, $writable);
Expand Down
67 changes: 0 additions & 67 deletions tests/ReadableStreamTest.php

This file was deleted.

3 changes: 1 addition & 2 deletions tests/ThroughStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace React\Tests\Stream;

use React\Stream\ReadableStream;
use React\Stream\ThroughStream;

/**
Expand Down Expand Up @@ -122,7 +121,7 @@ public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause()
/** @test */
public function pipingStuffIntoItShouldWork()
{
$readable = new ReadableStream();
$readable = new ThroughStream();

$through = new ThroughStream();
$through->on('data', $this->expectCallableOnceWith('foo'));
Expand Down
21 changes: 10 additions & 11 deletions tests/UtilTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
namespace React\Tests\Stream;

use React\Stream\WritableResourceStream;
use React\Stream\ReadableStream;
use React\Stream\Util;
use React\Stream\WritableStream;
use React\Stream\CompositeStream;
use React\Stream\ThroughStream;

/**
* @covers React\Stream\Util
Expand Down Expand Up @@ -77,7 +76,7 @@ public function testPipeClosingDestPausesSource()
->expects($this->once())
->method('pause');

$writable = new WritableStream();
$writable = new ThroughStream();

Util::pipe($readable, $writable);

Expand Down Expand Up @@ -191,8 +190,8 @@ public function testPipeWithWritableResourceStream()

public function testPipeSetsUpListeners()
{
$source = new ReadableStream();
$dest = new WritableStream();
$source = new ThroughStream();
$dest = new ThroughStream();

$this->assertCount(0, $source->listeners('data'));
$this->assertCount(0, $source->listeners('end'));
Expand All @@ -207,8 +206,8 @@ public function testPipeSetsUpListeners()

public function testPipeClosingSourceRemovesListeners()
{
$source = new ReadableStream();
$dest = new WritableStream();
$source = new ThroughStream();
$dest = new ThroughStream();

Util::pipe($source, $dest);

Expand All @@ -221,8 +220,8 @@ public function testPipeClosingSourceRemovesListeners()

public function testPipeClosingDestRemovesListeners()
{
$source = new ReadableStream();
$dest = new WritableStream();
$source = new ThroughStream();
$dest = new ThroughStream();

Util::pipe($source, $dest);

Expand Down Expand Up @@ -251,8 +250,8 @@ public function testPipeDuplexIntoSelfEndsOnEnd()
/** @test */
public function forwardEventsShouldSetupForwards()
{
$source = new ReadableStream();
$target = new ReadableStream();
$source = new ThroughStream();
$target = new ThroughStream();

Util::forwardEvents($source, $target, array('data'));
$target->on('data', $this->expectCallableOnce());
Expand Down
Loading