Skip to content

Add backpressure support for StringObservable.from(InputStream) #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 98 additions & 16 deletions src/main/java/rx/observables/StringObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func0;
Expand All @@ -38,8 +39,10 @@
import java.nio.charset.CodingErrorAction;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;


public class StringObservable {
/**
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
Expand Down Expand Up @@ -114,28 +117,106 @@ public void call(S resource) {
* internal buffer size
* @return the Observable containing read byte arrays from the input
*/
public static Observable<byte[]> from(final InputStream i, final int size) {
public static Observable<byte[]> from(final InputStream is, final int size) {
return Observable.create(new OnSubscribe<byte[]>() {
@Override
public void call(Subscriber<? super byte[]> o) {
public void call(Subscriber<? super byte[]> subscriber) {
subscriber.setProducer(new InputStreamProducer(is, subscriber, size));
}
});
}

private static class InputStreamProducer implements Producer {

private final AtomicLong requested = new AtomicLong(0);
private final InputStream is;
private final Subscriber<? super byte[]> subscriber;
private int size;

InputStreamProducer(InputStream is, Subscriber<? super byte[]> subscriber, int size) {
this.is = is;
this.subscriber = subscriber;
this.size = size;
}

@Override
public void request(long n) {
try {
if (requested.get() == Long.MAX_VALUE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think RxJava's contract on request(n) isn't as strong as reactive-streams, therefore, request() should be thread-safe and non-racy. Here, if a concurrent request(1) and request(Long.MAX_VALUE); arrives, the one can start both cases and end up having negative request amount.

// already started with fast path
return;
else if (n == Long.MAX_VALUE) {
// fast path
requestAll();
} else
requestSome(n);
} catch (RuntimeException e) {
subscriber.onError(e);
} catch (IOException e) {
subscriber.onError(e);
}
}

private void requestAll() {
requested.set(Long.MAX_VALUE);
byte[] buffer = new byte[size];
try {
if (subscriber.isUnsubscribed())
return;
int n = is.read(buffer);
while (n != -1 && !subscriber.isUnsubscribed()) {
subscriber.onNext(Arrays.copyOf(buffer, n));
if (!subscriber.isUnsubscribed())
n = is.read(buffer);
}
} catch (IOException e) {
subscriber.onError(e);
}
if (subscriber.isUnsubscribed())
return;
subscriber.onCompleted();
}



private void requestSome(long n) throws IOException {
// back pressure path
// this algorithm copied roughly from
// rxjava/OnSubscribeFromIterable.java

long previousCount = requested.getAndAdd(n);
if (previousCount == 0) {
byte[] buffer = new byte[size];
try {
if (o.isUnsubscribed())
while (true) {
long r = requested.get();
long numToEmit = r;

//emit numToEmit

if (subscriber.isUnsubscribed())
return;
int n = i.read(buffer);
while (n != -1 && !o.isUnsubscribed()) {
o.onNext(Arrays.copyOf(buffer, n));
if (!o.isUnsubscribed())
n = i.read(buffer);
int numRead;
if (numToEmit>0)
numRead = is.read(buffer);
else
numRead = 0;
while (numRead != -1 && !subscriber.isUnsubscribed() && numToEmit>0) {
subscriber.onNext(Arrays.copyOf(buffer, numRead));
numToEmit--;
if (numToEmit >0 && !subscriber.isUnsubscribed())
numRead = is.read(buffer);
}
} catch (IOException e) {
o.onError(e);

//check if we have finished
if (numRead == -1 && !subscriber.isUnsubscribed())
subscriber.onCompleted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A return is missing here; without it, the while loop would spin until the downstream actually unsubscribes.

else if (subscriber.isUnsubscribed())
return;
else if (requested.addAndGet(-r) == 0)
return;
}
if (o.isUnsubscribed())
return;
o.onCompleted();
}
});
}
}

/**
Expand Down Expand Up @@ -176,7 +257,8 @@ public void call(Subscriber<? super String> o) {
n = i.read(buffer);
while (n != -1 && !o.isUnsubscribed()) {
o.onNext(new String(buffer, 0, n));
n = i.read(buffer);
if (!o.isUnsubscribed())
n = i.read(buffer);
}
} catch (IOException e) {
o.onError(e);
Expand Down
86 changes: 86 additions & 0 deletions src/test/java/rx/observables/StringObservableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import static rx.observables.StringObservable.using;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FilterReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
import java.nio.charset.Charset;
Expand All @@ -54,6 +56,7 @@
import rx.Observer;
import rx.functions.Func1;
import rx.observables.StringObservable.UnsafeFunc0;
import rx.Subscriber;
import rx.observers.TestObserver;
import rx.observers.TestSubscriber;

Expand Down Expand Up @@ -296,13 +299,96 @@ public synchronized int read(byte[] b, int off, int len) {
assertEquals(1, numReads.get());
}

@Test
public void testFromInputStreamWithBackpressureShouldTakeFourRequests() {
final byte[] inBytes = "tester".getBytes();
final ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
final AtomicInteger requestCount = new AtomicInteger(0);
subscribeToInputStream(inBytes, 2, outBytes, 1, requestCount);
assertArrayEquals(inBytes, outBytes.toByteArray());
assertEquals(4, requestCount.get());
}

@Test
public void testFromInputStreamWithBackpressureRequestingMoreThanExist() {
final byte[] inBytes = "tester".getBytes();
final ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
final AtomicInteger requestCount = new AtomicInteger(0);
subscribeToInputStream(inBytes, 32, outBytes, 200, requestCount);
assertArrayEquals(inBytes, outBytes.toByteArray());
assertEquals(2, requestCount.get());
}

@Test
public void testFromEmptyInputStreamWithBackpressure() {
final byte[] inBytes = "".getBytes();
final ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
final AtomicInteger requestCount = new AtomicInteger(0);
subscribeToInputStream(inBytes, 32, outBytes, 1, requestCount);
assertArrayEquals(inBytes, outBytes.toByteArray());
assertEquals(1, requestCount.get());
}

private static void subscribeToInputStream(byte[] inBytes, int bufferSize,
final ByteArrayOutputStream outBytes, final int requestSize,
final AtomicInteger requestCount) {

from(new ByteArrayInputStream(inBytes), bufferSize).subscribe(
new Subscriber<byte[]>() {

@Override
public void onStart() {
request(requestSize);
requestCount.incrementAndGet();
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}

@Override
public void onNext(byte[] t) {
try {
outBytes.write(t);
} catch (IOException e) {
throw new RuntimeException(e);
}
request(requestSize);
requestCount.incrementAndGet();
}
});
}

@Test
public void testFromReader() {
final String inStr = "test";
final String outStr = from(new StringReader(inStr)).toBlocking().single();
assertNotSame(inStr, outStr);
assertEquals(inStr, outStr);
}

@Test
public void testFromReaderWillUnsubscribeBeforeCallingNextRead() {
final byte[] inBytes = "test".getBytes();
final AtomicInteger numReads = new AtomicInteger(0);
ByteArrayInputStream is = new ByteArrayInputStream(inBytes) {

@Override
public synchronized int read(byte[] b, int off, int len) {
numReads.incrementAndGet();
return super.read(b, off, len);
}
};
StringObservable.from(new InputStreamReader(is)).first().toBlocking()
.single();
assertEquals(1, numReads.get());
}

@Test
public void testByLine() {
Expand Down