19
19
import static org .mockito .Matchers .*;
20
20
import static org .mockito .Mockito .*;
21
21
22
+ import java .util .Random ;
22
23
import java .util .concurrent .ExecutorService ;
23
24
import java .util .concurrent .Executors ;
24
25
import java .util .concurrent .Future ;
@@ -68,10 +69,18 @@ public final class SynchronizedObserver<T> implements Observer<T> {
68
69
private final SafeObservableSubscription subscription ;
69
70
private volatile boolean finishRequested = false ;
70
71
private volatile boolean finished = false ;
72
+ private volatile Object lock ;
71
73
72
74
public SynchronizedObserver (Observer <? super T > Observer , SafeObservableSubscription subscription ) {
73
75
this .observer = Observer ;
74
76
this .subscription = subscription ;
77
+ this .lock = this ;
78
+ }
79
+
80
+ public SynchronizedObserver (Observer <? super T > Observer , SafeObservableSubscription subscription , Object lock ) {
81
+ this .observer = Observer ;
82
+ this .subscription = subscription ;
83
+ this .lock = lock ;
75
84
}
76
85
77
86
/**
@@ -80,16 +89,15 @@ public SynchronizedObserver(Observer<? super T> Observer, SafeObservableSubscrip
80
89
* @param Observer
81
90
*/
82
91
public SynchronizedObserver (Observer <? super T > Observer ) {
83
- this .observer = Observer ;
84
- this .subscription = new SafeObservableSubscription ();
92
+ this (Observer , new SafeObservableSubscription ());
85
93
}
86
94
87
95
public void onNext (T arg ) {
88
96
if (finished || finishRequested || subscription .isUnsubscribed ()) {
89
97
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
90
98
return ;
91
99
}
92
- synchronized (this ) {
100
+ synchronized (lock ) {
93
101
// check again since this could have changed while waiting
94
102
if (finished || finishRequested || subscription .isUnsubscribed ()) {
95
103
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
@@ -105,7 +113,7 @@ public void onError(Throwable e) {
105
113
return ;
106
114
}
107
115
finishRequested = true ;
108
- synchronized (this ) {
116
+ synchronized (lock ) {
109
117
// check again since this could have changed while waiting
110
118
if (finished || subscription .isUnsubscribed ()) {
111
119
return ;
@@ -121,7 +129,7 @@ public void onCompleted() {
121
129
return ;
122
130
}
123
131
finishRequested = true ;
124
- synchronized (this ) {
132
+ synchronized (lock ) {
125
133
// check again since this could have changed while waiting
126
134
if (finished || subscription .isUnsubscribed ()) {
127
135
return ;
@@ -188,6 +196,46 @@ public void testMultiThreadedBasic() {
188
196
assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
189
197
}
190
198
199
+ @ Test
200
+ public void testMultiThreadedBasicWithLock () {
201
+ Subscription s = mock (Subscription .class );
202
+ TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable (s , "one" , "two" , "three" );
203
+ Observable <String > w = Observable .create (onSubscribe );
204
+
205
+ SafeObservableSubscription as = new SafeObservableSubscription (s );
206
+ BusyObserver busyObserver = new BusyObserver ();
207
+
208
+ Object lock = new Object ();
209
+ ExternalBusyThread externalBusyThread = new ExternalBusyThread (busyObserver , lock , 10 , 100 );
210
+
211
+ SynchronizedObserver <String > aw = new SynchronizedObserver <String >(busyObserver , as , lock );
212
+
213
+ externalBusyThread .start ();
214
+
215
+ w .subscribe (aw );
216
+ onSubscribe .waitToFinish ();
217
+
218
+ try {
219
+ externalBusyThread .join (10000 );
220
+ assertFalse (externalBusyThread .isAlive ());
221
+ assertFalse (externalBusyThread .fail );
222
+ } catch (InterruptedException e ) {
223
+ // ignore
224
+ }
225
+
226
+ assertEquals (3 , busyObserver .onNextCount .get ());
227
+ assertFalse (busyObserver .onError );
228
+ assertTrue (busyObserver .onCompleted );
229
+ // non-deterministic because unsubscribe happens after 'waitToFinish' releases
230
+ // so commenting out for now as this is not a critical thing to test here
231
+ // verify(s, times(1)).unsubscribe();
232
+
233
+ // we can have concurrency ...
234
+ assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
235
+ // ... but the onNext execution should be single threaded
236
+ assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
237
+ }
238
+
191
239
@ Test
192
240
public void testMultiThreadedWithNPE () {
193
241
Subscription s = mock (Subscription .class );
@@ -220,6 +268,52 @@ public void testMultiThreadedWithNPE() {
220
268
assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
221
269
}
222
270
271
+ @ Test
272
+ public void testMultiThreadedWithNPEAndLock () {
273
+ Subscription s = mock (Subscription .class );
274
+ TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable (s , "one" , "two" , "three" , null );
275
+ Observable <String > w = Observable .create (onSubscribe );
276
+
277
+ SafeObservableSubscription as = new SafeObservableSubscription (s );
278
+ BusyObserver busyObserver = new BusyObserver ();
279
+
280
+ Object lock = new Object ();
281
+ ExternalBusyThread externalBusyThread = new ExternalBusyThread (busyObserver , lock , 10 , 100 );
282
+
283
+ SynchronizedObserver <String > aw = new SynchronizedObserver <String >(busyObserver , as , lock );
284
+
285
+ externalBusyThread .start ();
286
+
287
+ w .subscribe (aw );
288
+ onSubscribe .waitToFinish ();
289
+
290
+ try {
291
+ externalBusyThread .join (10000 );
292
+ assertFalse (externalBusyThread .isAlive ());
293
+ assertFalse (externalBusyThread .fail );
294
+ } catch (InterruptedException e ) {
295
+ // ignore
296
+ }
297
+
298
+ System .out .println ("maxConcurrentThreads: " + onSubscribe .maxConcurrentThreads .get ());
299
+
300
+ // we can't know how many onNext calls will occur since they each run on a separate thread
301
+ // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
302
+ // assertEquals(3, busyObserver.onNextCount.get());
303
+ assertTrue (busyObserver .onNextCount .get () < 4 );
304
+ assertTrue (busyObserver .onError );
305
+ // no onCompleted because onError was invoked
306
+ assertFalse (busyObserver .onCompleted );
307
+ // non-deterministic because unsubscribe happens after 'waitToFinish' releases
308
+ // so commenting out for now as this is not a critical thing to test here
309
+ //verify(s, times(1)).unsubscribe();
310
+
311
+ // we can have concurrency ...
312
+ assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
313
+ // ... but the onNext execution should be single threaded
314
+ assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
315
+ }
316
+
223
317
@ Test
224
318
public void testMultiThreadedWithNPEinMiddle () {
225
319
Subscription s = mock (Subscription .class );
@@ -250,6 +344,50 @@ public void testMultiThreadedWithNPEinMiddle() {
250
344
assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
251
345
}
252
346
347
+ @ Test
348
+ public void testMultiThreadedWithNPEinMiddleAndLock () {
349
+ Subscription s = mock (Subscription .class );
350
+ TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable (s , "one" , "two" , "three" , null , "four" , "five" , "six" , "seven" , "eight" , "nine" );
351
+ Observable <String > w = Observable .create (onSubscribe );
352
+
353
+ SafeObservableSubscription as = new SafeObservableSubscription (s );
354
+ BusyObserver busyObserver = new BusyObserver ();
355
+
356
+ Object lock = new Object ();
357
+ ExternalBusyThread externalBusyThread = new ExternalBusyThread (busyObserver , lock , 10 , 100 );
358
+
359
+ SynchronizedObserver <String > aw = new SynchronizedObserver <String >(busyObserver , as , lock );
360
+
361
+ externalBusyThread .start ();
362
+
363
+ w .subscribe (aw );
364
+ onSubscribe .waitToFinish ();
365
+
366
+ try {
367
+ externalBusyThread .join (10000 );
368
+ assertFalse (externalBusyThread .isAlive ());
369
+ assertFalse (externalBusyThread .fail );
370
+ } catch (InterruptedException e ) {
371
+ // ignore
372
+ }
373
+
374
+ System .out .println ("maxConcurrentThreads: " + onSubscribe .maxConcurrentThreads .get ());
375
+ // this should not be the full number of items since the error should stop it before it completes all 9
376
+ System .out .println ("onNext count: " + busyObserver .onNextCount .get ());
377
+ assertTrue (busyObserver .onNextCount .get () < 9 );
378
+ assertTrue (busyObserver .onError );
379
+ // no onCompleted because onError was invoked
380
+ assertFalse (busyObserver .onCompleted );
381
+ // non-deterministic because unsubscribe happens after 'waitToFinish' releases
382
+ // so commenting out for now as this is not a critical thing to test here
383
+ // verify(s, times(1)).unsubscribe();
384
+
385
+ // we can have concurrency ...
386
+ assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
387
+ // ... but the onNext execution should be single threaded
388
+ assertEquals (1 , busyObserver .maxConcurrentThreads .get ());
389
+ }
390
+
253
391
/**
254
392
* A non-realistic use case that tries to expose thread-safety issues by throwing lots of out-of-order
255
393
* events on many threads.
@@ -617,14 +755,32 @@ private static class BusyObserver implements Observer<String> {
617
755
618
756
@ Override
619
757
public void onCompleted () {
758
+ threadsRunning .incrementAndGet ();
759
+
620
760
System .out .println (">>> BusyObserver received onCompleted" );
621
761
onCompleted = true ;
762
+
763
+ int concurrentThreads = threadsRunning .get ();
764
+ int maxThreads = maxConcurrentThreads .get ();
765
+ if (concurrentThreads > maxThreads ) {
766
+ maxConcurrentThreads .compareAndSet (maxThreads , concurrentThreads );
767
+ }
768
+ threadsRunning .decrementAndGet ();
622
769
}
623
770
624
771
@ Override
625
772
public void onError (Throwable e ) {
773
+ threadsRunning .incrementAndGet ();
774
+
626
775
System .out .println (">>> BusyObserver received onError: " + e .getMessage ());
627
776
onError = true ;
777
+
778
+ int concurrentThreads = threadsRunning .get ();
779
+ int maxThreads = maxConcurrentThreads .get ();
780
+ if (concurrentThreads > maxThreads ) {
781
+ maxConcurrentThreads .compareAndSet (maxThreads , concurrentThreads );
782
+ }
783
+ threadsRunning .decrementAndGet ();
628
784
}
629
785
630
786
@ Override
@@ -652,6 +808,70 @@ public void onNext(String args) {
652
808
653
809
}
654
810
811
+ private static class ExternalBusyThread extends Thread {
812
+
813
+ private BusyObserver observer ;
814
+ private Object lock ;
815
+ private int lockTimes ;
816
+ private int waitTime ;
817
+ public volatile boolean fail ;
818
+
819
+ public ExternalBusyThread (BusyObserver observer , Object lock , int lockTimes , int waitTime ) {
820
+ this .observer = observer ;
821
+ this .lock = lock ;
822
+ this .lockTimes = lockTimes ;
823
+ this .waitTime = waitTime ;
824
+ this .fail = false ;
825
+ }
826
+
827
+ @ Override
828
+ public void run () {
829
+ Random r = new Random ();
830
+ for (int i = 0 ; i < lockTimes ; i ++) {
831
+ synchronized (lock ) {
832
+ int oldOnNextCount = observer .onNextCount .get ();
833
+ boolean oldOnCompleted = observer .onCompleted ;
834
+ boolean oldOnError = observer .onError ;
835
+ try {
836
+ Thread .sleep (r .nextInt (waitTime ));
837
+ } catch (InterruptedException e ) {
838
+ // ignore
839
+ }
840
+ // Since we own the lock, onNextCount, onCompleted and
841
+ // onError must not be changed.
842
+ int newOnNextCount = observer .onNextCount .get ();
843
+ boolean newOnCompleted = observer .onCompleted ;
844
+ boolean newOnError = observer .onError ;
845
+ if (oldOnNextCount != newOnNextCount ) {
846
+ System .out .println (">>> ExternalBusyThread received different onNextCount: "
847
+ + oldOnNextCount
848
+ + " -> "
849
+ + newOnNextCount );
850
+ fail = true ;
851
+ break ;
852
+ }
853
+ if (oldOnCompleted != newOnCompleted ) {
854
+ System .out .println (">>> ExternalBusyThread received different onCompleted: "
855
+ + oldOnCompleted
856
+ + " -> "
857
+ + newOnCompleted );
858
+ fail = true ;
859
+ break ;
860
+ }
861
+ if (oldOnError != newOnError ) {
862
+ System .out .println (">>> ExternalBusyThread received different onError: "
863
+ + oldOnError
864
+ + " -> "
865
+ + newOnError );
866
+ fail = true ;
867
+ break ;
868
+ }
869
+ }
870
+ }
871
+ }
872
+
873
+ }
874
+
655
875
}
656
876
657
877
}
0 commit comments