Skip to content

Commit 2bcf76b

Browse files
Merge pull request ReactiveX#287 from benjchristensen/error-handling-and-validation
Error handling and validation
2 parents 81f78a0 + 5b1c115 commit 2bcf76b

File tree

4 files changed

+205
-51
lines changed

4 files changed

+205
-51
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 144 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import rx.subjects.Subject;
8383
import rx.subscriptions.BooleanSubscription;
8484
import rx.subscriptions.Subscriptions;
85+
import rx.util.OnErrorNotImplementedException;
8586
import rx.util.Range;
8687
import rx.util.Timestamped;
8788
import rx.util.functions.Action0;
@@ -116,14 +117,10 @@ public class Observable<T> {
116117

117118
private final Func1<Observer<T>, Subscription> onSubscribe;
118119

119-
protected Observable() {
120-
this(null);
121-
}
122-
123120
/**
124-
* Construct an Observable with Function to execute when subscribed to.
121+
* Observable with Function to execute when subscribed to.
125122
* <p>
126-
* NOTE: Generally you're better off using {@link #create(Func1)} to create an Observable instead of using inheritance.
123+
* NOTE: Use {@link #create(Func1)} to create an Observable instead of this method unless you specifically have a need for inheritance.
127124
*
128125
* @param onSubscribe
129126
* {@link Func1} to be executed when {@link #subscribe(Observer)} is called.
@@ -132,6 +129,11 @@ protected Observable(Func1<Observer<T>, Subscription> onSubscribe) {
132129
this.onSubscribe = onSubscribe;
133130
}
134131

132+
protected Observable() {
133+
this(null);
134+
//TODO should this be made private to prevent it? It really serves no good purpose and only confuses things. Unit tests are incorrectly using it today
135+
}
136+
135137
/**
136138
* an {@link Observer} must call an Observable's <code>subscribe</code> method in order to register itself
137139
* to receive push-based notifications from the Observable. A typical implementation of the
@@ -156,11 +158,16 @@ protected Observable(Func1<Observer<T>, Subscription> onSubscribe) {
156158
* @param observer
157159
* @return a {@link Subscription} reference that allows observers
158160
* to stop receiving notifications before the provider has finished sending them
161+
* @throws IllegalArgumentException
162+
* if null argument provided
159163
*/
160164
public Subscription subscribe(Observer<T> observer) {
161165
// allow the hook to intercept and/or decorate
162166
Func1<Observer<T>, Subscription> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
163167
// validate and proceed
168+
if (observer == null) {
169+
throw new IllegalArgumentException("observer can not be null");
170+
}
164171
if (onSubscribeFunction == null) {
165172
throw new IllegalStateException("onSubscribe function can not be null.");
166173
// the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
@@ -183,10 +190,16 @@ public Subscription subscribe(Observer<T> observer) {
183190
subscription.wrap(onSubscribeFunction.call(new AtomicObserver<T>(subscription, observer)));
184191
return hook.onSubscribeReturn(this, subscription);
185192
}
193+
} catch (OnErrorNotImplementedException e) {
194+
// special handling when onError is not implemented ... we just rethrow
195+
throw e;
186196
} catch (Exception e) {
187197
// if an unhandled error occurs executing the onSubscribe we will propagate it
188198
try {
189199
observer.onError(hook.onSubscribeError(this, e));
200+
} catch (OnErrorNotImplementedException e2) {
201+
// special handling when onError is not implemented ... we just rethrow
202+
throw e2;
190203
} catch (Exception e2) {
191204
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
192205
// so we are unable to propagate the error correctly and will just throw
@@ -223,6 +236,8 @@ public Subscription subscribe(Observer<T> observer) {
223236
* The {@link Scheduler} that the sequence is subscribed to on.
224237
* @return a {@link Subscription} reference that allows observers
225238
* to stop receiving notifications before the provider has finished sending them
239+
* @throws IllegalArgumentException
240+
* if null argument provided
226241
*/
227242
public Subscription subscribe(Observer<T> observer, Scheduler scheduler) {
228243
return subscribeOn(scheduler).subscribe(observer);
@@ -240,11 +255,14 @@ private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
240255

241256
@SuppressWarnings({ "rawtypes", "unchecked" })
242257
public Subscription subscribe(final Map<String, Object> callbacks) {
243-
// lookup and memoize onNext
258+
if (callbacks == null) {
259+
throw new RuntimeException("callbacks map can not be null");
260+
}
244261
Object _onNext = callbacks.get("onNext");
245262
if (_onNext == null) {
246-
throw new RuntimeException("onNext must be implemented");
263+
throw new RuntimeException("'onNext' key must contain an implementation");
247264
}
265+
// lookup and memoize onNext
248266
final FuncN onNext = Functions.from(_onNext);
249267

250268
/**
@@ -268,6 +286,8 @@ public void onError(Exception e) {
268286
Object onError = callbacks.get("onError");
269287
if (onError != null) {
270288
Functions.from(onError).call(e);
289+
} else {
290+
throw new OnErrorNotImplementedException(e);
271291
}
272292
}
273293

@@ -290,10 +310,11 @@ public Subscription subscribe(final Object o) {
290310
return subscribe((Observer) o);
291311
}
292312

293-
// lookup and memoize onNext
294313
if (o == null) {
295-
throw new RuntimeException("onNext must be implemented");
314+
throw new IllegalArgumentException("onNext can not be null");
296315
}
316+
317+
// lookup and memoize onNext
297318
final FuncN onNext = Functions.from(o);
298319

299320
/**
@@ -311,7 +332,7 @@ public void onCompleted() {
311332
@Override
312333
public void onError(Exception e) {
313334
handleError(e);
314-
// no callback defined
335+
throw new OnErrorNotImplementedException(e);
315336
}
316337

317338
@Override
@@ -327,6 +348,9 @@ public Subscription subscribe(final Object o, Scheduler scheduler) {
327348
}
328349

329350
public Subscription subscribe(final Action1<T> onNext) {
351+
if (onNext == null) {
352+
throw new IllegalArgumentException("onNext can not be null");
353+
}
330354

331355
/**
332356
* Wrapping since raw functions provided by the user are being invoked.
@@ -343,14 +367,11 @@ public void onCompleted() {
343367
@Override
344368
public void onError(Exception e) {
345369
handleError(e);
346-
// no callback defined
370+
throw new OnErrorNotImplementedException(e);
347371
}
348372

349373
@Override
350374
public void onNext(T args) {
351-
if (onNext == null) {
352-
throw new RuntimeException("onNext must be implemented");
353-
}
354375
onNext.call(args);
355376
}
356377

@@ -363,10 +384,14 @@ public Subscription subscribe(final Action1<T> onNext, Scheduler scheduler) {
363384

364385
@SuppressWarnings({ "rawtypes", "unchecked" })
365386
public Subscription subscribe(final Object onNext, final Object onError) {
366-
// lookup and memoize onNext
367387
if (onNext == null) {
368-
throw new RuntimeException("onNext must be implemented");
388+
throw new IllegalArgumentException("onNext can not be null");
389+
}
390+
if (onError == null) {
391+
throw new IllegalArgumentException("onError can not be null");
369392
}
393+
394+
// lookup and memoize onNext
370395
final FuncN onNextFunction = Functions.from(onNext);
371396

372397
/**
@@ -384,9 +409,7 @@ public void onCompleted() {
384409
@Override
385410
public void onError(Exception e) {
386411
handleError(e);
387-
if (onError != null) {
388-
Functions.from(onError).call(e);
389-
}
412+
Functions.from(onError).call(e);
390413
}
391414

392415
@Override
@@ -402,6 +425,12 @@ public Subscription subscribe(final Object onNext, final Object onError, Schedul
402425
}
403426

404427
public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError) {
428+
if (onNext == null) {
429+
throw new IllegalArgumentException("onNext can not be null");
430+
}
431+
if (onError == null) {
432+
throw new IllegalArgumentException("onError can not be null");
433+
}
405434

406435
/**
407436
* Wrapping since raw functions provided by the user are being invoked.
@@ -418,16 +447,11 @@ public void onCompleted() {
418447
@Override
419448
public void onError(Exception e) {
420449
handleError(e);
421-
if (onError != null) {
422-
onError.call(e);
423-
}
450+
onError.call(e);
424451
}
425452

426453
@Override
427454
public void onNext(T args) {
428-
if (onNext == null) {
429-
throw new RuntimeException("onNext must be implemented");
430-
}
431455
onNext.call(args);
432456
}
433457

@@ -440,10 +464,17 @@ public Subscription subscribe(final Action1<T> onNext, final Action1<Exception>
440464

441465
@SuppressWarnings({ "rawtypes", "unchecked" })
442466
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) {
443-
// lookup and memoize onNext
444467
if (onNext == null) {
445-
throw new RuntimeException("onNext must be implemented");
468+
throw new IllegalArgumentException("onNext can not be null");
469+
}
470+
if (onError == null) {
471+
throw new IllegalArgumentException("onError can not be null");
472+
}
473+
if (onComplete == null) {
474+
throw new IllegalArgumentException("onComplete can not be null");
446475
}
476+
477+
// lookup and memoize onNext
447478
final FuncN onNextFunction = Functions.from(onNext);
448479

449480
/**
@@ -455,17 +486,13 @@ public Subscription subscribe(final Object onNext, final Object onError, final O
455486

456487
@Override
457488
public void onCompleted() {
458-
if (onComplete != null) {
459-
Functions.from(onComplete).call();
460-
}
489+
Functions.from(onComplete).call();
461490
}
462491

463492
@Override
464493
public void onError(Exception e) {
465494
handleError(e);
466-
if (onError != null) {
467-
Functions.from(onError).call(e);
468-
}
495+
Functions.from(onError).call(e);
469496
}
470497

471498
@Override
@@ -481,6 +508,15 @@ public Subscription subscribe(final Object onNext, final Object onError, final O
481508
}
482509

483510
public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete) {
511+
if (onNext == null) {
512+
throw new IllegalArgumentException("onNext can not be null");
513+
}
514+
if (onError == null) {
515+
throw new IllegalArgumentException("onError can not be null");
516+
}
517+
if (onComplete == null) {
518+
throw new IllegalArgumentException("onComplete can not be null");
519+
}
484520

485521
/**
486522
* Wrapping since raw functions provided by the user are being invoked.
@@ -497,16 +533,11 @@ public void onCompleted() {
497533
@Override
498534
public void onError(Exception e) {
499535
handleError(e);
500-
if (onError != null) {
501-
onError.call(e);
502-
}
536+
onError.call(e);
503537
}
504538

505539
@Override
506540
public void onNext(T args) {
507-
if (onNext == null) {
508-
throw new RuntimeException("onNext must be implemented");
509-
}
510541
onNext.call(args);
511542
}
512543

@@ -3768,6 +3799,79 @@ public void call(String v) {
37683799
assertEquals(1, counter.get());
37693800
}
37703801

3802+
/**
3803+
* https://github.com/Netflix/RxJava/issues/198
3804+
*
3805+
* Rx Design Guidelines 5.2
3806+
*
3807+
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
3808+
* to rethrow the exception on the thread that the message comes out from the observable sequence.
3809+
* The OnCompleted behavior in this case is to do nothing."
3810+
*/
3811+
@Test
3812+
public void testErrorThrownWithoutErrorHandlerSynchronous() {
3813+
try {
3814+
error(new RuntimeException("failure")).subscribe(new Action1<Object>() {
3815+
3816+
@Override
3817+
public void call(Object t1) {
3818+
// won't get anything
3819+
}
3820+
3821+
});
3822+
fail("expected exception");
3823+
} catch (Exception e) {
3824+
assertEquals("failure", e.getMessage());
3825+
}
3826+
}
3827+
3828+
/**
3829+
* https://github.com/Netflix/RxJava/issues/198
3830+
*
3831+
* Rx Design Guidelines 5.2
3832+
*
3833+
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
3834+
* to rethrow the exception on the thread that the message comes out from the observable sequence.
3835+
* The OnCompleted behavior in this case is to do nothing."
3836+
*
3837+
* @throws InterruptedException
3838+
*/
3839+
@Test
3840+
public void testErrorThrownWithoutErrorHandlerAsynchronous() throws InterruptedException {
3841+
final CountDownLatch latch = new CountDownLatch(1);
3842+
final AtomicReference<Exception> exception = new AtomicReference<Exception>();
3843+
Observable.create(new Func1<Observer<String>, Subscription>() {
3844+
3845+
@Override
3846+
public Subscription call(final Observer<String> observer) {
3847+
new Thread(new Runnable() {
3848+
3849+
@Override
3850+
public void run() {
3851+
try {
3852+
observer.onError(new RuntimeException("failure"));
3853+
} catch (Exception e) {
3854+
// without an onError handler it has to just throw on whatever thread invokes it
3855+
exception.set(e);
3856+
}
3857+
latch.countDown();
3858+
}
3859+
}).start();
3860+
return Subscriptions.empty();
3861+
}
3862+
}).subscribe(new Action1<Object>() {
3863+
3864+
@Override
3865+
public void call(Object t1) {
3866+
3867+
}
3868+
3869+
});
3870+
// wait for exception
3871+
latch.await(3000, TimeUnit.MILLISECONDS);
3872+
assertNotNull(exception.get());
3873+
assertEquals("failure", exception.get().getMessage());
3874+
}
37713875
}
37723876

37733877
}

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@
4646
*/
4747
public class BlockingObservable<T> extends Observable<T> {
4848

49+
protected BlockingObservable(Func1<Observer<T>, Subscription> onSubscribe) {
50+
super(onSubscribe);
51+
}
52+
53+
/**
54+
* Used to prevent public instantiation
55+
*/
56+
@SuppressWarnings("unused")
57+
private BlockingObservable() {
58+
// prevent public instantiation
59+
}
60+
4961
public static <T> BlockingObservable<T> from(final Observable<T> o) {
5062
return new BlockingObservable<T>(new Func1<Observer<T>, Subscription>() {
5163

@@ -354,10 +366,6 @@ public static <T> Iterable<T> toIterable(final Observable<T> source) {
354366
return from(source).toIterable();
355367
}
356368

357-
protected BlockingObservable(Func1<Observer<T>, Subscription> onSubscribe) {
358-
super(onSubscribe);
359-
}
360-
361369
/**
362370
* Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance.
363371
* <p>

0 commit comments

Comments
 (0)