Skip to content

Commit dd51e5d

Browse files
committed
Merge pull request #3334 from akarnokd/DisposableTests2x
2.x: disposable unit tests + fix to RefCountDisposable behavior
2 parents 2e32a33 + b80a051 commit dd51e5d

15 files changed

+1947
-181
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 973 additions & 176 deletions
Large diffs are not rendered by default.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.annotations;
15+
16+
/**
17+
* Enumeration for various kinds of backpressure support.
18+
*/
19+
public enum BackpressureKind {
20+
/**
21+
* The backpressure-related requests pass through this operator without change
22+
*/
23+
PASS_THROUGH,
24+
/**
25+
* The operator fully supports backpressure and may coordinate downstream requests
26+
* with upstream requests through batching, arbitration or by other means.
27+
*/
28+
FULL,
29+
/**
30+
* The operator performs special backpressure management; see the associated javadoc.
31+
*/
32+
SPECIAL,
33+
/**
34+
* The operator requests Long.MAX_VALUE from upstream but respects the backpressure
35+
* of the downstream.
36+
*/
37+
UNBOUNDED_IN,
38+
/**
39+
* The operator will emit a MissingBackpressureException if the downstream didn't request
40+
* enough or in time.
41+
*/
42+
ERROR,
43+
/**
44+
* The operator ignores all kinds of backpressure and may overflow the downstream.
45+
*/
46+
NONE
47+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.annotations;
15+
16+
import java.lang.annotation.*;
17+
18+
/**
19+
* Indicates the backpressure support kind of the associated operator or class.
20+
*/
21+
@Retention(RetentionPolicy.RUNTIME)
22+
@Documented
23+
@Target({ElementType.METHOD, ElementType.TYPE})
24+
public @interface BackpressureSupport {
25+
/**
26+
* The backpressure supported by this method or class.
27+
* @return backpressure supported by this method or class.
28+
*/
29+
BackpressureKind value();
30+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.annotations;
15+
16+
/**
17+
* Indicates the feature is in beta state: it will be most likely stay but
18+
* the signature may change between versions without warning.
19+
*/
20+
public @interface Beta {
21+
22+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.annotations;
15+
16+
/**
17+
* Indicates the feature is in experimental state: its existence, signature or behavior
18+
* might change without warning from one release to the next.
19+
*/
20+
public @interface Experimental {
21+
22+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.annotations;
15+
16+
/**
17+
* Indicates what scheduler the method or class uses by default
18+
*/
19+
public enum SchedulerKind {
20+
/**
21+
* The operator/class doesn't use schedulers.
22+
*/
23+
NONE,
24+
/**
25+
* The operator/class runs on the computation scheduler or takes timing information from it.
26+
*/
27+
COMPUTATION,
28+
/**
29+
* The operator/class runs on the io scheduler or takes timing information from it.
30+
*/
31+
IO,
32+
/**
33+
* The operator/class runs on the new thread scheduler or takes timing information from it.
34+
*/
35+
NEW_THREAD,
36+
/**
37+
* The operator/class runs on the trampoline scheduler or takes timing information from it.
38+
*/
39+
TRAMPOLINE,
40+
/**
41+
* The operator/class runs on the single scheduler or takes timing information from it.
42+
*/
43+
SINGLE,
44+
/**
45+
* The operator/class requires a scheduler to be manually specified.
46+
*/
47+
CUSTOM
48+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.annotations;
15+
16+
import java.lang.annotation.*;
17+
18+
/**
19+
* Indicates what kind of scheduler the class or method uses.
20+
*/
21+
@Retention(RetentionPolicy.RUNTIME)
22+
@Documented
23+
@Target({ElementType.METHOD, ElementType.TYPE})
24+
public @interface SchedulerSupport {
25+
/**
26+
* The kind of scheduler the class or method uses.
27+
* @return the kind of scheduler the class or method uses
28+
*/
29+
SchedulerKind value();
30+
}

src/main/java/io/reactivex/disposables/RefCountDisposable.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,27 @@ public final class RefCountDisposable implements Disposable {
2727
volatile int count;
2828
static final AtomicIntegerFieldUpdater<RefCountDisposable> COUNT =
2929
AtomicIntegerFieldUpdater.newUpdater(RefCountDisposable.class, "count");
30-
30+
31+
volatile int once;
32+
static final AtomicIntegerFieldUpdater<RefCountDisposable> ONCE =
33+
AtomicIntegerFieldUpdater.newUpdater(RefCountDisposable.class, "once");
34+
3135
public RefCountDisposable(Disposable resource) {
3236
Objects.requireNonNull(resource);
3337
RESOURCE.lazySet(this, resource);
38+
COUNT.lazySet(this, 1);
3439
}
3540

3641
@Override
3742
public void dispose() {
43+
if (ONCE.compareAndSet(this, 0, 1)) {
44+
if (COUNT.decrementAndGet(this) == 0) {
45+
disposeActual();
46+
}
47+
}
48+
}
49+
50+
void disposeActual() {
3851
Disposable d = resource;
3952
if (d != DISPOSED) {
4053
d = RESOURCE.getAndSet(this, DISPOSED);
@@ -51,7 +64,7 @@ public Disposable get() {
5164

5265
void release() {
5366
if (COUNT.decrementAndGet(this) == 0) {
54-
dispose();
67+
disposeActual();
5568
}
5669
}
5770

src/main/java/io/reactivex/internal/disposables/SetCompositeResource.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import java.util.function.Consumer;
1717

1818
import io.reactivex.disposables.Disposable;
19-
import io.reactivex.internal.util.OpenHashSet;
19+
import io.reactivex.internal.util.*;
2020

2121
/**
2222
* A set-based composite resource with custom disposer callback.
@@ -143,7 +143,7 @@ public void dispose() {
143143
set = null;
144144
}
145145
if (s != null) {
146-
s.forEach(disposer);
146+
disposeAll(s);
147147
}
148148
}
149149
}
@@ -163,8 +163,14 @@ public void clear() {
163163
set = null;
164164
}
165165
if (s != null) {
166-
s.forEach(disposer);
166+
disposeAll(s);
167167
}
168168
}
169169
}
170+
void disposeAll(OpenHashSet<T> s) {
171+
Throwable ex = s.forEachSuppress(disposer);
172+
if (ex != null) {
173+
Exceptions.propagate(ex);
174+
}
175+
}
170176
}

src/main/java/io/reactivex/internal/util/OpenHashSet.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.Arrays;
2222
import java.util.function.Consumer;
2323

24+
import io.reactivex.exceptions.CompositeException;
25+
2426
/**
2527
* A simple open hash set with add, remove and clear capabilities only.
2628
* <p>Doesn't support nor checks for {@code null}s.
@@ -191,6 +193,34 @@ public void forEach(Consumer<? super T> consumer) {
191193
}
192194
}
193195
}
196+
197+
/**
198+
* Loops through all values in the set and collects any exceptions from the consumer
199+
* into a Throwable.
200+
* @param consumer the consumer to call
201+
* @return if not null, contains a CompositeException with all the suppressed exceptions
202+
*/
203+
public Throwable forEachSuppress(Consumer<? super T> consumer) {
204+
CompositeException ex = null;
205+
int count = 0;
206+
for (T k : keys) {
207+
if (k != null) {
208+
try {
209+
consumer.accept(k);
210+
} catch (Throwable e) {
211+
if (ex == null) {
212+
ex = new CompositeException();
213+
}
214+
count++;
215+
ex.addSuppressed(e);
216+
}
217+
}
218+
}
219+
if (count == 1) {
220+
return ex.getSuppressed()[0];
221+
}
222+
return ex;
223+
}
194224

195225
public boolean isEmpty() {
196226
return size == 0;

0 commit comments

Comments
 (0)