@@ -410,4 +410,136 @@ public void call() {
410410 }
411411 assertTrue ("Timeout means `unsubscribe` is not called" , unsubscribe .await (30 , TimeUnit .SECONDS ));
412412 }
413+
414+ @ Test
415+ public void testUnsubscribeFromSingleWhenInterrupted () throws InterruptedException {
416+ new InterruptionTests ().assertUnsubscribeIsInvoked ("single()" , new Action1 <BlockingObservable <Void >>() {
417+ @ Override
418+ public void call (final BlockingObservable <Void > o ) {
419+ o .single ();
420+ }
421+ });
422+ }
423+
424+ @ Test
425+ public void testUnsubscribeFromForEachWhenInterrupted () throws InterruptedException {
426+ new InterruptionTests ().assertUnsubscribeIsInvoked ("forEach()" , new Action1 <BlockingObservable <Void >>() {
427+ @ Override
428+ public void call (final BlockingObservable <Void > o ) {
429+ o .forEach (new Action1 <Void >() {
430+ @ Override
431+ public void call (final Void aVoid ) {
432+ // nothing
433+ }
434+ });
435+ }
436+ });
437+ }
438+
439+ @ Test
440+ public void testUnsubscribeFromFirstWhenInterrupted () throws InterruptedException {
441+ new InterruptionTests ().assertUnsubscribeIsInvoked ("first()" , new Action1 <BlockingObservable <Void >>() {
442+ @ Override
443+ public void call (final BlockingObservable <Void > o ) {
444+ o .first ();
445+ }
446+ });
447+ }
448+
449+ @ Test
450+ public void testUnsubscribeFromLastWhenInterrupted () throws InterruptedException {
451+ new InterruptionTests ().assertUnsubscribeIsInvoked ("last()" , new Action1 <BlockingObservable <Void >>() {
452+ @ Override
453+ public void call (final BlockingObservable <Void > o ) {
454+ o .last ();
455+ }
456+ });
457+ }
458+
459+ @ Test
460+ public void testUnsubscribeFromLatestWhenInterrupted () throws InterruptedException {
461+ new InterruptionTests ().assertUnsubscribeIsInvoked ("latest()" , new Action1 <BlockingObservable <Void >>() {
462+ @ Override
463+ public void call (final BlockingObservable <Void > o ) {
464+ o .latest ().iterator ().next ();
465+ }
466+ });
467+ }
468+
469+ @ Test
470+ public void testUnsubscribeFromNextWhenInterrupted () throws InterruptedException {
471+ new InterruptionTests ().assertUnsubscribeIsInvoked ("next()" , new Action1 <BlockingObservable <Void >>() {
472+ @ Override
473+ public void call (final BlockingObservable <Void > o ) {
474+ o .next ().iterator ().next ();
475+ }
476+ });
477+ }
478+
479+ @ Test
480+ public void testUnsubscribeFromGetIteratorWhenInterrupted () throws InterruptedException {
481+ new InterruptionTests ().assertUnsubscribeIsInvoked ("getIterator()" , new Action1 <BlockingObservable <Void >>() {
482+ @ Override
483+ public void call (final BlockingObservable <Void > o ) {
484+ o .getIterator ().next ();
485+ }
486+ });
487+ }
488+
489+ @ Test
490+ public void testUnsubscribeFromToIterableWhenInterrupted () throws InterruptedException {
491+ new InterruptionTests ().assertUnsubscribeIsInvoked ("toIterable()" , new Action1 <BlockingObservable <Void >>() {
492+ @ Override
493+ public void call (final BlockingObservable <Void > o ) {
494+ o .toIterable ().iterator ().next ();
495+ }
496+ });
497+ }
498+
499+ /** Utilities set for interruption behaviour tests. */
500+ private static class InterruptionTests {
501+
502+ private boolean isUnSubscribed ;
503+ private RuntimeException error ;
504+ private CountDownLatch latch = new CountDownLatch (1 );
505+
506+ private Observable <Void > createObservable () {
507+ return Observable .<Void >never ().doOnUnsubscribe (new Action0 () {
508+ @ Override
509+ public void call () {
510+ isUnSubscribed = true ;
511+ }
512+ });
513+ }
514+
515+ private void startBlockingAndInterrupt (final Action1 <BlockingObservable <Void >> blockingAction ) {
516+ Thread subscriptionThread = new Thread () {
517+ @ Override
518+ public void run () {
519+ try {
520+ blockingAction .call (createObservable ().toBlocking ());
521+ } catch (RuntimeException e ) {
522+ if (!(e .getCause () instanceof InterruptedException )) {
523+ error = e ;
524+ }
525+ }
526+ latch .countDown ();
527+ }
528+ };
529+ subscriptionThread .start ();
530+ subscriptionThread .interrupt ();
531+ }
532+
533+ void assertUnsubscribeIsInvoked (final String method , final Action1 <BlockingObservable <Void >> blockingAction )
534+ throws InterruptedException {
535+ startBlockingAndInterrupt (blockingAction );
536+ assertTrue ("Timeout means interruption is not performed" , latch .await (30 , TimeUnit .SECONDS ));
537+ if (error != null ) {
538+ throw error ;
539+ }
540+ assertTrue ("'unsubscribe' is not invoked when thread is interrupted for " + method , isUnSubscribed );
541+ }
542+
543+ }
544+
413545}
0 commit comments