File tree Expand file tree Collapse file tree 1 file changed +7
-7
lines changed
rxjava-core/src/main/java/rx Expand file tree Collapse file tree 1 file changed +7
-7
lines changed Original file line number Diff line number Diff line change @@ -3325,7 +3325,11 @@ private boolean isInternalImplementation(Object o) {
3325
3325
if (o == null ) {
3326
3326
return true ;
3327
3327
}
3328
- return (o .getClass ().getPackage ().getName ().startsWith ("rx." ));
3328
+ // prevent double-wrapping (yeah it happens)
3329
+ if (o instanceof AtomicObserver )
3330
+ return true ;
3331
+ // we treat the following package as "internal" and don't wrap it
3332
+ return o .getClass ().getPackage ().getName ().startsWith ("rx.operators" );
3329
3333
}
3330
3334
3331
3335
public static class UnitTest {
@@ -3658,11 +3662,7 @@ public void run() {
3658
3662
}).start ();
3659
3663
return s ;
3660
3664
}
3661
- }).subscribe (new AtomicObserver <String >(new AtomicObservableSubscription (), new Observer <String >() {
3662
- // we are manually wrapping in AtomicObserver here to simulate
3663
- // what will happen when a user provided Observer implementation is passed in
3664
- // since the subscribe method will wrap it in AtomicObserver if it's not in an rx.* package
3665
-
3665
+ }).subscribe (new Observer <String >() {
3666
3666
@ Override
3667
3667
public void onCompleted () {
3668
3668
System .out .println ("completed" );
@@ -3683,7 +3683,7 @@ public void onNext(String v) {
3683
3683
count .incrementAndGet ();
3684
3684
}
3685
3685
3686
- })) ;
3686
+ });
3687
3687
3688
3688
// wait for async sequence to complete
3689
3689
latch .await ();
You can’t perform that action at this time.
0 commit comments