@@ -46,14 +46,14 @@ protected FlowableProcessor<Object> create() {
46
46
47
47
@ Test
48
48
public void testNeverCompleted () {
49
- AsyncProcessor <String > subject = AsyncProcessor .create ();
49
+ AsyncProcessor <String > processor = AsyncProcessor .create ();
50
50
51
51
Subscriber <String > observer = TestHelper .mockSubscriber ();
52
- subject .subscribe (observer );
52
+ processor .subscribe (observer );
53
53
54
- subject .onNext ("one" );
55
- subject .onNext ("two" );
56
- subject .onNext ("three" );
54
+ processor .onNext ("one" );
55
+ processor .onNext ("two" );
56
+ processor .onNext ("three" );
57
57
58
58
verify (observer , Mockito .never ()).onNext (anyString ());
59
59
verify (observer , Mockito .never ()).onError (testException );
@@ -62,15 +62,15 @@ public void testNeverCompleted() {
62
62
63
63
@ Test
64
64
public void testCompleted () {
65
- AsyncProcessor <String > subject = AsyncProcessor .create ();
65
+ AsyncProcessor <String > processor = AsyncProcessor .create ();
66
66
67
67
Subscriber <String > observer = TestHelper .mockSubscriber ();
68
- subject .subscribe (observer );
68
+ processor .subscribe (observer );
69
69
70
- subject .onNext ("one" );
71
- subject .onNext ("two" );
72
- subject .onNext ("three" );
73
- subject .onComplete ();
70
+ processor .onNext ("one" );
71
+ processor .onNext ("two" );
72
+ processor .onNext ("three" );
73
+ processor .onComplete ();
74
74
75
75
verify (observer , times (1 )).onNext ("three" );
76
76
verify (observer , Mockito .never ()).onError (any (Throwable .class ));
@@ -80,13 +80,13 @@ public void testCompleted() {
80
80
@ Test
81
81
@ Ignore ("Null values not allowed" )
82
82
public void testNull () {
83
- AsyncProcessor <String > subject = AsyncProcessor .create ();
83
+ AsyncProcessor <String > processor = AsyncProcessor .create ();
84
84
85
85
Subscriber <String > observer = TestHelper .mockSubscriber ();
86
- subject .subscribe (observer );
86
+ processor .subscribe (observer );
87
87
88
- subject .onNext (null );
89
- subject .onComplete ();
88
+ processor .onNext (null );
89
+ processor .onComplete ();
90
90
91
91
verify (observer , times (1 )).onNext (null );
92
92
verify (observer , Mockito .never ()).onError (any (Throwable .class ));
@@ -95,16 +95,16 @@ public void testNull() {
95
95
96
96
@ Test
97
97
public void testSubscribeAfterCompleted () {
98
- AsyncProcessor <String > subject = AsyncProcessor .create ();
98
+ AsyncProcessor <String > processor = AsyncProcessor .create ();
99
99
100
100
Subscriber <String > observer = TestHelper .mockSubscriber ();
101
101
102
- subject .onNext ("one" );
103
- subject .onNext ("two" );
104
- subject .onNext ("three" );
105
- subject .onComplete ();
102
+ processor .onNext ("one" );
103
+ processor .onNext ("two" );
104
+ processor .onNext ("three" );
105
+ processor .onComplete ();
106
106
107
- subject .subscribe (observer );
107
+ processor .subscribe (observer );
108
108
109
109
verify (observer , times (1 )).onNext ("three" );
110
110
verify (observer , Mockito .never ()).onError (any (Throwable .class ));
@@ -113,18 +113,18 @@ public void testSubscribeAfterCompleted() {
113
113
114
114
@ Test
115
115
public void testSubscribeAfterError () {
116
- AsyncProcessor <String > subject = AsyncProcessor .create ();
116
+ AsyncProcessor <String > processor = AsyncProcessor .create ();
117
117
118
118
Subscriber <String > observer = TestHelper .mockSubscriber ();
119
119
120
- subject .onNext ("one" );
121
- subject .onNext ("two" );
122
- subject .onNext ("three" );
120
+ processor .onNext ("one" );
121
+ processor .onNext ("two" );
122
+ processor .onNext ("three" );
123
123
124
124
RuntimeException re = new RuntimeException ("failed" );
125
- subject .onError (re );
125
+ processor .onError (re );
126
126
127
- subject .subscribe (observer );
127
+ processor .subscribe (observer );
128
128
129
129
verify (observer , times (1 )).onError (re );
130
130
verify (observer , Mockito .never ()).onNext (any (String .class ));
@@ -133,18 +133,18 @@ public void testSubscribeAfterError() {
133
133
134
134
@ Test
135
135
public void testError () {
136
- AsyncProcessor <String > subject = AsyncProcessor .create ();
136
+ AsyncProcessor <String > processor = AsyncProcessor .create ();
137
137
138
138
Subscriber <String > observer = TestHelper .mockSubscriber ();
139
- subject .subscribe (observer );
139
+ processor .subscribe (observer );
140
140
141
- subject .onNext ("one" );
142
- subject .onNext ("two" );
143
- subject .onNext ("three" );
144
- subject .onError (testException );
145
- subject .onNext ("four" );
146
- subject .onError (new Throwable ());
147
- subject .onComplete ();
141
+ processor .onNext ("one" );
142
+ processor .onNext ("two" );
143
+ processor .onNext ("three" );
144
+ processor .onError (testException );
145
+ processor .onNext ("four" );
146
+ processor .onError (new Throwable ());
147
+ processor .onComplete ();
148
148
149
149
verify (observer , Mockito .never ()).onNext (anyString ());
150
150
verify (observer , times (1 )).onError (testException );
@@ -153,23 +153,23 @@ public void testError() {
153
153
154
154
@ Test
155
155
public void testUnsubscribeBeforeCompleted () {
156
- AsyncProcessor <String > subject = AsyncProcessor .create ();
156
+ AsyncProcessor <String > processor = AsyncProcessor .create ();
157
157
158
158
Subscriber <String > observer = TestHelper .mockSubscriber ();
159
159
TestSubscriber <String > ts = new TestSubscriber <String >(observer );
160
- subject .subscribe (ts );
160
+ processor .subscribe (ts );
161
161
162
- subject .onNext ("one" );
163
- subject .onNext ("two" );
162
+ processor .onNext ("one" );
163
+ processor .onNext ("two" );
164
164
165
165
ts .dispose ();
166
166
167
167
verify (observer , Mockito .never ()).onNext (anyString ());
168
168
verify (observer , Mockito .never ()).onError (any (Throwable .class ));
169
169
verify (observer , Mockito .never ()).onComplete ();
170
170
171
- subject .onNext ("three" );
172
- subject .onComplete ();
171
+ processor .onNext ("three" );
172
+ processor .onComplete ();
173
173
174
174
verify (observer , Mockito .never ()).onNext (anyString ());
175
175
verify (observer , Mockito .never ()).onError (any (Throwable .class ));
@@ -178,12 +178,12 @@ public void testUnsubscribeBeforeCompleted() {
178
178
179
179
@ Test
180
180
public void testEmptySubjectCompleted () {
181
- AsyncProcessor <String > subject = AsyncProcessor .create ();
181
+ AsyncProcessor <String > processor = AsyncProcessor .create ();
182
182
183
183
Subscriber <String > observer = TestHelper .mockSubscriber ();
184
- subject .subscribe (observer );
184
+ processor .subscribe (observer );
185
185
186
- subject .onComplete ();
186
+ processor .onComplete ();
187
187
188
188
InOrder inOrder = inOrder (observer );
189
189
inOrder .verify (observer , never ()).onNext (null );
@@ -204,10 +204,10 @@ public void testSubscribeCompletionRaceCondition() {
204
204
* With the synchronization code in place I can not get this to fail on my laptop.
205
205
*/
206
206
for (int i = 0 ; i < 50 ; i ++) {
207
- final AsyncProcessor <String > subject = AsyncProcessor .create ();
207
+ final AsyncProcessor <String > processor = AsyncProcessor .create ();
208
208
final AtomicReference <String > value1 = new AtomicReference <String >();
209
209
210
- subject .subscribe (new Consumer <String >() {
210
+ processor .subscribe (new Consumer <String >() {
211
211
212
212
@ Override
213
213
public void accept (String t1 ) {
@@ -226,15 +226,15 @@ public void accept(String t1) {
226
226
227
227
@ Override
228
228
public void run () {
229
- subject .onNext ("value" );
230
- subject .onComplete ();
229
+ processor .onNext ("value" );
230
+ processor .onComplete ();
231
231
}
232
232
});
233
233
234
- SubjectSubscriberThread t2 = new SubjectSubscriberThread (subject );
235
- SubjectSubscriberThread t3 = new SubjectSubscriberThread (subject );
236
- SubjectSubscriberThread t4 = new SubjectSubscriberThread (subject );
237
- SubjectSubscriberThread t5 = new SubjectSubscriberThread (subject );
234
+ SubjectSubscriberThread t2 = new SubjectSubscriberThread (processor );
235
+ SubjectSubscriberThread t3 = new SubjectSubscriberThread (processor );
236
+ SubjectSubscriberThread t4 = new SubjectSubscriberThread (processor );
237
+ SubjectSubscriberThread t5 = new SubjectSubscriberThread (processor );
238
238
239
239
t2 .start ();
240
240
t3 .start ();
@@ -262,18 +262,18 @@ public void run() {
262
262
263
263
private static class SubjectSubscriberThread extends Thread {
264
264
265
- private final AsyncProcessor <String > subject ;
265
+ private final AsyncProcessor <String > processor ;
266
266
private final AtomicReference <String > value = new AtomicReference <String >();
267
267
268
- SubjectSubscriberThread (AsyncProcessor <String > subject ) {
269
- this .subject = subject ;
268
+ SubjectSubscriberThread (AsyncProcessor <String > processor ) {
269
+ this .processor = processor ;
270
270
}
271
271
272
272
@ Override
273
273
public void run () {
274
274
try {
275
275
// a timeout exception will happen if we don't get a terminal state
276
- String v = subject .timeout (2000 , TimeUnit .MILLISECONDS ).blockingSingle ();
276
+ String v = processor .timeout (2000 , TimeUnit .MILLISECONDS ).blockingSingle ();
277
277
value .set (v );
278
278
} catch (Exception e ) {
279
279
e .printStackTrace ();
0 commit comments