Skip to content

use AbstractOnSubscribe for StringObservable.from(InputStream) and from(Reader) #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package rx.internal.operators;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;

import rx.Subscriber;
import rx.observables.AbstractOnSubscribe;

public final class OnSubscribeInputStream extends AbstractOnSubscribe<byte[], InputStream> {

private final InputStream is;
private final int size;

public OnSubscribeInputStream(InputStream is, int size) {
this.is = is;
this.size = size;
}

@Override
protected InputStream onSubscribe(Subscriber<? super byte[]> subscriber) {
return is;
}

@Override
protected void next(SubscriptionState<byte[], InputStream> state) {

InputStream is = state.state();
byte[] buffer = new byte[size];
try {
int count = is.read(buffer);
if (count == -1)
state.onCompleted();
else if (count < size)
state.onNext(Arrays.copyOf(buffer, count));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd only copy if count < size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I'll adjust

else
state.onNext(buffer);
} catch (IOException e) {
state.onError(e);
}
}
}
39 changes: 39 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package rx.internal.operators;

import java.io.IOException;
import java.io.Reader;

import rx.Subscriber;
import rx.observables.AbstractOnSubscribe;

public final class OnSubscribeReader extends AbstractOnSubscribe<String, Reader> {

private final Reader reader;
private final int size;

public OnSubscribeReader(Reader reader, int size) {
this.reader = reader;
this.size = size;
}

@Override
protected Reader onSubscribe(Subscriber<? super String> subscriber) {
return reader;
}

@Override
protected void next(SubscriptionState<String, Reader> state) {

Reader reader = state.state();
char[] buffer = new char[size];
try {
int count = reader.read(buffer);
if (count == -1)
state.onCompleted();
else
state.onNext(String.valueOf(buffer, 0, count));
} catch (IOException e) {
state.onError(e);
}
}
}
86 changes: 23 additions & 63 deletions src/main/java/rx/observables/StringObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
package rx.observables;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.operators.OnSubscribeInputStream;
import rx.internal.operators.OnSubscribeReader;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -42,8 +43,8 @@

public class StringObservable {
/**
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
* {@code byte[]}s
* Reads bytes from a source {@link InputStream} and outputs {@link Observable} of
* {@code byte[]}s. Supports backpressure.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
*
Expand Down Expand Up @@ -103,44 +104,24 @@ public void call(S resource) {
}

/**
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
* {@code byte[]}s
* Reads bytes from a source {@link InputStream} and outputs {@link Observable} of
* {@code byte[]}s. Supports backpressure.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
*
* @param i
* @param is
* Source {@link InputStream}
* @param size
* internal buffer size
* @return the Observable containing read byte arrays from the input
*/
public static Observable<byte[]> from(final InputStream i, final int size) {
return Observable.create(new OnSubscribe<byte[]>() {
@Override
public void call(Subscriber<? super byte[]> o) {
byte[] buffer = new byte[size];
try {
if (o.isUnsubscribed())
return;
int n = i.read(buffer);
while (n != -1 && !o.isUnsubscribed()) {
o.onNext(Arrays.copyOf(buffer, n));
if (!o.isUnsubscribed())
n = i.read(buffer);
}
} catch (IOException e) {
o.onError(e);
}
if (o.isUnsubscribed())
return;
o.onCompleted();
}
});
public static Observable<byte[]> from(final InputStream is, final int size) {
return Observable.create(new OnSubscribeInputStream(is, size));
}

/**
* Reads from the characters from a source {@link Reader} and outputs {@link Observable} of
* {@link String}s
* Reads characters from a source {@link Reader} and outputs {@link Observable} of
* {@link String}s. Supports backpressure.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
*
Expand All @@ -153,8 +134,8 @@ public static Observable<String> from(final Reader i) {
}

/**
* Reads from the characters from a source {@link Reader} and outputs {@link Observable} of
* {@link String}s
* Reads characters from a source {@link Reader} and outputs {@link Observable} of
* {@link String}s. Supports backpressure.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
*
Expand All @@ -164,33 +145,12 @@ public static Observable<String> from(final Reader i) {
* internal buffer size
* @return the Observable of Strings read from the source
*/
public static Observable<String> from(final Reader i, final int size) {
return Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> o) {
char[] buffer = new char[size];
try {
if (o.isUnsubscribed())
return;
int n = 0;
n = i.read(buffer);
while (n != -1 && !o.isUnsubscribed()) {
o.onNext(new String(buffer, 0, n));
if (!o.isUnsubscribed())
n = i.read(buffer);
}
} catch (IOException e) {
o.onError(e);
}
if (o.isUnsubscribed())
return;
o.onCompleted();
}
});
public static Observable<String> from(final Reader reader, final int size) {
return Observable.create(new OnSubscribeReader(reader, size));
}

/**
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
* Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams
* and where handles when a multibyte character spans two chunks.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png" alt="">
Expand All @@ -204,7 +164,7 @@ public static Observable<String> decode(Observable<byte[]> src, String charsetNa
}

/**
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
* Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams
* and where handles when a multibyte character spans two chunks.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png" alt="">
Expand All @@ -218,8 +178,8 @@ public static Observable<String> decode(Observable<byte[]> src, Charset charset)
}

/**
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
* and where it handles when a multibyte character spans two chunks.
* Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams
* and handles when a multibyte character spans two chunks.
* This method allows for more control over how malformed and unmappable characters are handled.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png" alt="">
Expand Down Expand Up @@ -311,7 +271,7 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
}

/**
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
* Encodes a possibly infinite stream of strings into an Observable of byte arrays.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.encode.png" alt="">
*
Expand All @@ -324,7 +284,7 @@ public static Observable<byte[]> encode(Observable<String> src, String charsetNa
}

/**
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
* Encodes a possibly infinite stream of strings into an Observable of byte arrays.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.encode.png" alt="">
*
Expand All @@ -337,7 +297,7 @@ public static Observable<byte[]> encode(Observable<String> src, Charset charset)
}

/**
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
* Encodes a possibly infinite stream of strings into an Observable of byte arrays.
* This method allows for more control over how malformed and unmappable characters are handled.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.encode.png" alt="">
Expand Down Expand Up @@ -548,7 +508,7 @@ public static Observable<String> byLine(Observable<String> source) {
}

/**
* Converts an String into an Observable that emits the chars in the String.
* Converts a String into an Observable that emits the chars in the String.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/from.png" alt="">
*
Expand Down
Loading