From 5ba98126e795f95554a9badcfe46a2e696eee180 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Fri, 3 Apr 2015 19:58:14 +1100 Subject: [PATCH] use AbstractOnSubscribe for StringObservable.from(InputStream) and from(Reader) --- .../operators/OnSubscribeInputStream.java | 42 +++++++ .../internal/operators/OnSubscribeReader.java | 39 ++++++ .../java/rx/observables/StringObservable.java | 86 ++++--------- .../rx/observables/StringObservableTest.java | 115 ++++++++++++------ 4 files changed, 179 insertions(+), 103 deletions(-) create mode 100644 src/main/java/rx/internal/operators/OnSubscribeInputStream.java create mode 100644 src/main/java/rx/internal/operators/OnSubscribeReader.java diff --git a/src/main/java/rx/internal/operators/OnSubscribeInputStream.java b/src/main/java/rx/internal/operators/OnSubscribeInputStream.java new file mode 100644 index 0000000..5f82221 --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeInputStream.java @@ -0,0 +1,42 @@ +package rx.internal.operators; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +import rx.Subscriber; +import rx.observables.AbstractOnSubscribe; + +public final class OnSubscribeInputStream extends AbstractOnSubscribe { + + private final InputStream is; + private final int size; + + public OnSubscribeInputStream(InputStream is, int size) { + this.is = is; + this.size = size; + } + + @Override + protected InputStream onSubscribe(Subscriber subscriber) { + return is; + } + + @Override + protected void next(SubscriptionState state) { + + InputStream is = state.state(); + byte[] buffer = new byte[size]; + try { + int count = is.read(buffer); + if (count == -1) + state.onCompleted(); + else if (count < size) + state.onNext(Arrays.copyOf(buffer, count)); + else + state.onNext(buffer); + } catch (IOException e) { + state.onError(e); + } + } +} diff --git a/src/main/java/rx/internal/operators/OnSubscribeReader.java b/src/main/java/rx/internal/operators/OnSubscribeReader.java new file mode 100644 index 0000000..75b5ee4 --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeReader.java @@ -0,0 +1,39 @@ +package rx.internal.operators; + +import java.io.IOException; +import java.io.Reader; + +import rx.Subscriber; +import rx.observables.AbstractOnSubscribe; + +public final class OnSubscribeReader extends AbstractOnSubscribe { + + private final Reader reader; + private final int size; + + public OnSubscribeReader(Reader reader, int size) { + this.reader = reader; + this.size = size; + } + + @Override + protected Reader onSubscribe(Subscriber subscriber) { + return reader; + } + + @Override + protected void next(SubscriptionState state) { + + Reader reader = state.state(); + char[] buffer = new char[size]; + try { + int count = reader.read(buffer); + if (count == -1) + state.onCompleted(); + else + state.onNext(String.valueOf(buffer, 0, count)); + } catch (IOException e) { + state.onError(e); + } + } +} diff --git a/src/main/java/rx/observables/StringObservable.java b/src/main/java/rx/observables/StringObservable.java index 5a9a9b2..8beecce 100644 --- a/src/main/java/rx/observables/StringObservable.java +++ b/src/main/java/rx/observables/StringObservable.java @@ -16,13 +16,14 @@ package rx.observables; import rx.Observable; -import rx.Observable.OnSubscribe; import rx.Observable.Operator; import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; +import rx.internal.operators.OnSubscribeInputStream; +import rx.internal.operators.OnSubscribeReader; import java.io.Closeable; import java.io.IOException; @@ -42,8 +43,8 @@ public class StringObservable { /** - * Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of - * {@code byte[]}s + * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of + * {@code byte[]}s. Supports backpressure. *

* * @@ -103,44 +104,24 @@ public void call(S resource) { } /** - * Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of - * {@code byte[]}s + * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of + * {@code byte[]}s. Supports backpressure. *

* * - * @param i + * @param is * Source {@link InputStream} * @param size * internal buffer size * @return the Observable containing read byte arrays from the input */ - public static Observable from(final InputStream i, final int size) { - return Observable.create(new OnSubscribe() { - @Override - public void call(Subscriber o) { - byte[] buffer = new byte[size]; - try { - if (o.isUnsubscribed()) - return; - int n = i.read(buffer); - while (n != -1 && !o.isUnsubscribed()) { - o.onNext(Arrays.copyOf(buffer, n)); - if (!o.isUnsubscribed()) - n = i.read(buffer); - } - } catch (IOException e) { - o.onError(e); - } - if (o.isUnsubscribed()) - return; - o.onCompleted(); - } - }); + public static Observable from(final InputStream is, final int size) { + return Observable.create(new OnSubscribeInputStream(is, size)); } /** - * Reads from the characters from a source {@link Reader} and outputs {@link Observable} of - * {@link String}s + * Reads characters from a source {@link Reader} and outputs {@link Observable} of + * {@link String}s. Supports backpressure. *

* * @@ -153,8 +134,8 @@ public static Observable from(final Reader i) { } /** - * Reads from the characters from a source {@link Reader} and outputs {@link Observable} of - * {@link String}s + * Reads characters from a source {@link Reader} and outputs {@link Observable} of + * {@link String}s. Supports backpressure. *

* * @@ -164,33 +145,12 @@ public static Observable from(final Reader i) { * internal buffer size * @return the Observable of Strings read from the source */ - public static Observable from(final Reader i, final int size) { - return Observable.create(new OnSubscribe() { - @Override - public void call(Subscriber o) { - char[] buffer = new char[size]; - try { - if (o.isUnsubscribed()) - return; - int n = 0; - n = i.read(buffer); - while (n != -1 && !o.isUnsubscribed()) { - o.onNext(new String(buffer, 0, n)); - if (!o.isUnsubscribed()) - n = i.read(buffer); - } - } catch (IOException e) { - o.onError(e); - } - if (o.isUnsubscribed()) - return; - o.onCompleted(); - } - }); + public static Observable from(final Reader reader, final int size) { + return Observable.create(new OnSubscribeReader(reader, size)); } /** - * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams + * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams * and where handles when a multibyte character spans two chunks. *

* @@ -204,7 +164,7 @@ public static Observable decode(Observable src, String charsetNa } /** - * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams + * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams * and where handles when a multibyte character spans two chunks. *

* @@ -218,8 +178,8 @@ public static Observable decode(Observable src, Charset charset) } /** - * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams - * and where it handles when a multibyte character spans two chunks. + * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams + * and handles when a multibyte character spans two chunks. * This method allows for more control over how malformed and unmappable characters are handled. *

* @@ -311,7 +271,7 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) { } /** - * Encodes a possible infinite stream of strings into a Observable of byte arrays. + * Encodes a possibly infinite stream of strings into an Observable of byte arrays. *

* * @@ -324,7 +284,7 @@ public static Observable encode(Observable src, String charsetNa } /** - * Encodes a possible infinite stream of strings into a Observable of byte arrays. + * Encodes a possibly infinite stream of strings into an Observable of byte arrays. *

* * @@ -337,7 +297,7 @@ public static Observable encode(Observable src, Charset charset) } /** - * Encodes a possible infinite stream of strings into a Observable of byte arrays. + * Encodes a possibly infinite stream of strings into an Observable of byte arrays. * This method allows for more control over how malformed and unmappable characters are handled. *

* @@ -548,7 +508,7 @@ public static Observable byLine(Observable source) { } /** - * Converts an String into an Observable that emits the chars in the String. + * Converts a String into an Observable that emits the chars in the String. *

* * diff --git a/src/test/java/rx/observables/StringObservableTest.java b/src/test/java/rx/observables/StringObservableTest.java index 6aac7ad..dca6bf6 100644 --- a/src/test/java/rx/observables/StringObservableTest.java +++ b/src/test/java/rx/observables/StringObservableTest.java @@ -26,8 +26,8 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static rx.observables.StringObservable.byLine; import static rx.observables.StringObservable.byCharacter; +import static rx.observables.StringObservable.byLine; import static rx.observables.StringObservable.decode; import static rx.observables.StringObservable.encode; import static rx.observables.StringObservable.from; @@ -62,7 +62,8 @@ public class StringObservableTest { @Test public void testMultibyteSpanningTwoBuffers() { - Observable src = Observable.just(new byte[] { (byte) 0xc2 }, new byte[] { (byte) 0xa1 }); + Observable src = Observable.just(new byte[] { (byte) 0xc2 }, + new byte[] { (byte) 0xa1 }); String out = StringObservable.decode(src, "UTF-8").toBlocking().single(); assertEquals("\u00A1", out); @@ -101,7 +102,7 @@ public void testMalformedInTheMiddleReport() { } @Test - public void testPropogateError() { + public void testPropagateError() { Observable src = Observable.just(new byte[] { 65 }); Observable err = Observable.error(new IOException()); CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder(); @@ -114,7 +115,7 @@ public void testPropogateError() { } @Test - public void testPropogateErrorInTheMiddleOfMultibyte() { + public void testPropagateErrorInTheMiddleOfMultibyte() { Observable src = Observable.just(new byte[] { (byte) 0xc2 }); Observable err = Observable.error(new IOException()); CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder(); @@ -128,13 +129,12 @@ public void testPropogateErrorInTheMiddleOfMultibyte() { @Test public void testEncode() { - assertArrayEquals( - new byte[] { (byte) 0xc2, (byte) 0xa1 }, encode(Observable.just("\u00A1"), "UTF-8") - .toBlocking().single()); + assertArrayEquals(new byte[] { (byte) 0xc2, (byte) 0xa1 }, + encode(Observable.just("\u00A1"), "UTF-8").toBlocking().single()); } @Test - public void testSplitOnCollon() { + public void testSplitOnColon() { testSplit("boo:and:foo", ":", 0, "boo", "and", "foo"); } @@ -145,16 +145,16 @@ public void testSplitOnOh() { @Test public void testSplitOnEmptyStream() { - assertEquals(0, (int) StringObservable.split(Observable.empty(), "\n") - .count().toBlocking().single()); + assertEquals(0, (int) StringObservable.split(Observable. empty(), "\n").count() + .toBlocking().single()); } - + @Test public void testSplitOnStreamThatThrowsExceptionImmediately() { RuntimeException ex = new RuntimeException("boo"); try { - StringObservable.split(Observable.error(ex), "\n") - .count().toBlocking().single(); + StringObservable.split(Observable. error(ex), "\n").count().toBlocking() + .single(); fail(); } catch (RuntimeException e) { assertEquals(ex, e); @@ -170,10 +170,12 @@ public void testSplit(String str, String regex, int limit, String... parts) { } } - public void testSplit(String message, String regex, int limit, Observable src, String... parts) { + public void testSplit(String message, String regex, int limit, Observable src, + String... parts) { Observable act = split(src, regex); Observable exp = Observable.from(parts); - AssertObservable.assertObservableEqualsBlocking("when input is " + message + " and limit = " + limit, exp, act); + AssertObservable.assertObservableEqualsBlocking("when input is " + message + + " and limit = " + limit, exp, act); } @Test @@ -258,8 +260,8 @@ public void testJoinEmpty() { @Test public void testJoinThrows() { - Observable source = Observable.concat(Observable.just("a"), Observable - . error(new RuntimeException("Forced failure"))); + Observable source = Observable.concat(Observable.just("a"), + Observable. error(new RuntimeException("Forced failure"))); Observable result = join(source, ", "); @@ -281,6 +283,23 @@ public void testFromInputStream() { assertArrayEquals(inBytes, outBytes); } + @Test + public void testFromEmptyInputStream() { + final byte[] inBytes = new byte[0]; + int count = from(new ByteArrayInputStream(inBytes)).count().toBlocking().single(); + assertEquals(0, count); + } + + @Test + public void testFromInputStreamUsesBufferSize() { + final byte[] inBytes = "test".getBytes(); + List list = from(new ByteArrayInputStream(inBytes), 2).toList().toBlocking() + .single(); + assertEquals(2, list.size()); + assertArrayEquals("te".getBytes(), list.get(0)); + assertArrayEquals("st".getBytes(), list.get(1)); + } + @Test public void testFromInputStreamWillUnsubscribeBeforeCallingNextRead() { final byte[] inBytes = "test".getBytes(); @@ -300,40 +319,56 @@ public synchronized int read(byte[] b, int off, int len) { @Test public void testFromReader() { final String inStr = "test"; - final String outStr = from(new StringReader(inStr)).toBlocking().single(); + String outStr = from(new StringReader(inStr)).toBlocking().single(); assertNotSame(inStr, outStr); assertEquals(inStr, outStr); } - - @Test - public void testFromReaderWillUnsubscribeBeforeCallingNextRead() { - final byte[] inBytes = "test".getBytes(); - final AtomicInteger numReads = new AtomicInteger(0); - ByteArrayInputStream is = new ByteArrayInputStream(inBytes) { - - @Override - public synchronized int read(byte[] b, int off, int len) { - numReads.incrementAndGet(); - return super.read(b, off, len); - } - }; - StringObservable.from(new InputStreamReader(is)).first().toBlocking() - .single(); - assertEquals(1, numReads.get()); - } + + @Test + public void testFromEmptyReader() { + int count = from(new StringReader("")).count().toBlocking().single(); + assertEquals(0, count); + } + + @Test + public void testFromReaderUsesBufferSize() { + List list = from(new StringReader("test"), 2).toList().toBlocking().single(); + assertEquals(2, list.size()); + assertEquals("te", list.get(0)); + assertEquals("st", list.get(1)); + } + + @Test + public void testFromReaderWillUnsubscribeBeforeCallingNextRead() { + final byte[] inBytes = "test".getBytes(); + final AtomicInteger numReads = new AtomicInteger(0); + ByteArrayInputStream is = new ByteArrayInputStream(inBytes) { + + @Override + public synchronized int read(byte[] b, int off, int len) { + numReads.incrementAndGet(); + return super.read(b, off, len); + } + }; + StringObservable.from(new InputStreamReader(is)).first().toBlocking().single(); + assertEquals(1, numReads.get()); + } @Test public void testByLine() { String newLine = System.getProperty("line.separator"); - List lines = byLine(Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv"))).toList().toBlocking().single(); + List lines = byLine( + Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv"))) + .toList().toBlocking().single(); assertEquals(Arrays.asList("qwer", "asdf", "zxcv"), lines); } - + @Test public void testByCharacter() { - List chars = byCharacter(Observable.from(Arrays.asList("foo", "bar"))).toList().toBlocking().single(); + List chars = byCharacter(Observable.from(Arrays.asList("foo", "bar"))).toList() + .toBlocking().single(); assertEquals(Arrays.asList("f", "o", "o", "b", "a", "r"), chars); } @@ -355,7 +390,7 @@ public Observable call(Reader reader) { } }).subscribe(subscriber); - assertArrayEquals(new String[]{"he","ll","o"}, subscriber.getOnNextEvents().toArray()); + assertArrayEquals(new String[] { "he", "ll", "o" }, subscriber.getOnNextEvents().toArray()); assertEquals(1, subscriber.getOnCompletedEvents().size()); assertEquals(0, subscriber.getOnErrorEvents().size()); @@ -414,7 +449,7 @@ public Observable call(Reader reader) { } }).take(1).subscribe(subscriber); - assertArrayEquals(new String[]{"he"}, subscriber.getOnNextEvents().toArray()); + assertArrayEquals(new String[] { "he" }, subscriber.getOnNextEvents().toArray()); assertEquals(1, subscriber.getOnNextEvents().size()); assertEquals(1, subscriber.getOnCompletedEvents().size()); assertEquals(0, subscriber.getOnErrorEvents().size());