Skip to content

Commit 6b47b11

Browse files
authored
1.x: new fromAsync to bridge the callback world with the reactive (#4179)
* 1.x: new fromAsync to bridge the callback world with the reactive * Inline requested, fix buffer/latest accounting * Remove package qualifier
1 parent b9af588 commit 6b47b11

File tree

5 files changed

+1402
-2
lines changed

5 files changed

+1402
-2
lines changed

src/main/java/rx/AsyncEmitter.java

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx;
18+
19+
import rx.annotations.Experimental;
20+
21+
/**
22+
* Abstraction over a RxJava Subscriber that allows associating
23+
* a resource with it and exposes the current number of downstream
24+
* requested amount.
25+
* <p>
26+
* The onNext, onError and onCompleted methods should be called
27+
* in a sequential manner, just like the Observer's methods. The
28+
* other methods are threadsafe.
29+
*
30+
* @param <T> the value type to emit
31+
*/
32+
@Experimental
33+
public interface AsyncEmitter<T> extends Observer<T> {
34+
35+
/**
36+
* Sets a Subscription on this emitter; any previous Subscription
37+
* or Cancellation will be unsubscribed/cancelled.
38+
* @param s the subscription, null is allowed
39+
*/
40+
void setSubscription(Subscription s);
41+
42+
/**
43+
* Sets a Cancellable on this emitter; any previous Subscription
44+
* or Cancellation will be unsubscribed/cancelled.
45+
* @param c the cancellable resource, null is allowed
46+
*/
47+
void setCancellation(Cancellable c);
48+
/**
49+
* The current outstanding request amount.
50+
* <p>This method it threadsafe.
51+
* @return the current outstanding request amount
52+
*/
53+
long requested();
54+
55+
/**
56+
* A functional interface that has a single close method
57+
* that can throw.
58+
*/
59+
interface Cancellable {
60+
61+
/**
62+
* Cancel the action or free a resource.
63+
* @throws Exception on error
64+
*/
65+
void cancel() throws Exception;
66+
}
67+
68+
/**
69+
* Options to handle backpressure in the emitter.
70+
*/
71+
enum BackpressureMode {
72+
NONE,
73+
74+
ERROR,
75+
76+
BUFFER,
77+
78+
DROP,
79+
80+
LATEST
81+
}
82+
}

src/main/java/rx/Observable.java

+44
Original file line numberDiff line numberDiff line change
@@ -1680,6 +1680,50 @@ public static <T> Observable<T> from(T[] array) {
16801680
return create(new OnSubscribeFromArray<T>(array));
16811681
}
16821682

1683+
/**
1684+
* Provides an API (via a cold Observable) that bridges the reactive world with the callback-style,
1685+
* generally non-backpressured world.
1686+
* <p>
1687+
* Example:
1688+
* <pre><code>
1689+
* Observable.&lt;Event&gt;fromAsync(emitter -&gt; {
1690+
* Callback listener = new Callback() {
1691+
* &#64;Override
1692+
* public void onEvent(Event e) {
1693+
* emitter.onNext(e);
1694+
* if (e.isLast()) {
1695+
* emitter.onCompleted();
1696+
* }
1697+
* }
1698+
*
1699+
* &#64;Override
1700+
* public void onFailure(Exception e) {
1701+
* emitter.onError(e);
1702+
* }
1703+
* };
1704+
*
1705+
* AutoCloseable c = api.someMethod(listener);
1706+
*
1707+
* emitter.setCancellable(c::close);
1708+
*
1709+
* }, BackpressureMode.BUFFER);
1710+
* </code></pre>
1711+
* <p>
1712+
* You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The
1713+
* rest of its methods are threadsafe.
1714+
*
1715+
* @param asyncEmitter the emitter that is called when a Subscriber subscribes to the returned {@code Observable}
1716+
* @param backpressure the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough
1717+
* @return the new Observable instance
1718+
* @see AsyncEmitter
1719+
* @see AsyncEmitter.BackpressureMode
1720+
* @see AsyncEmitter.Cancellable
1721+
*/
1722+
@Experimental
1723+
public static <T> Observable<T> fromAsync(Action1<AsyncEmitter<T>> asyncEmitter, AsyncEmitter.BackpressureMode backpressure) {
1724+
return create(new OnSubscribeFromAsync<T>(asyncEmitter, backpressure));
1725+
}
1726+
16831727
/**
16841728
* Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then
16851729
* emits the value returned from that function.

src/main/java/rx/internal/operators/BackpressureUtils.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,14 @@ public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T o
6969
}
7070

7171
/**
72-
* Adds {@code n} to {@code requested} and returns the value prior to addition once the
72+
* Adds {@code n} (not validated) to {@code requested} and returns the value prior to addition once the
7373
* addition is successful (uses CAS semantics). If overflows then sets
7474
* {@code requested} field to {@code Long.MAX_VALUE}.
7575
*
7676
* @param requested
7777
* atomic long that should be updated
7878
* @param n
79-
* the number of requests to add to the requested count
79+
* the number of requests to add to the requested count, positive (not validated)
8080
* @return requested value just prior to successful addition
8181
*/
8282
public static long getAndAddRequest(AtomicLong requested, long n) {
@@ -413,4 +413,17 @@ public static long produced(AtomicLong requested, long n) {
413413
}
414414
}
415415
}
416+
417+
/**
418+
* Validates the requested amount and returns true if it is positive.
419+
* @param n the requested amount
420+
* @return true if n is positive
421+
* @throws IllegalArgumentException if n is negative
422+
*/
423+
public static boolean validate(long n) {
424+
if (n < 0) {
425+
throw new IllegalArgumentException("n >= 0 required but it was " + n);
426+
}
427+
return n != 0L;
428+
}
416429
}

0 commit comments

Comments
 (0)