From 17ad6a49703df2e1300ecfe912c200e6d103cdbd Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 14 Mar 2017 14:34:23 +0100 Subject: [PATCH 1/2] 2.x: add offer() method to Publish & Behavior Processors --- .../processors/BehaviorProcessor.java | 41 ++++++++++++- .../processors/PublishProcessor.java | 39 ++++++++++++- .../processors/BehaviorProcessorTest.java | 53 +++++++++++++++++ .../processors/PublishProcessorTest.java | 51 +++++++++++++++++ .../tck/AsyncProcessorAsPublisherTckTest.java | 57 +++++++++++++++++++ .../BehaviorProcessorAsPublisherTckTest.java | 53 +++++++++++++++++ .../PublishProcessorAsPublisherTckTest.java | 57 +++++++++++++++++++ ...yProcessorSizeBoundAsPublisherTckTest.java | 52 +++++++++++++++++ ...yProcessorTimeBoundAsPublisherTckTest.java | 54 ++++++++++++++++++ ...yProcessorUnboundedAsPublisherTckTest.java | 52 +++++++++++++++++ .../UnicastProcessorAsPublisherTckTest.java | 52 +++++++++++++++++ 11 files changed, 559 insertions(+), 2 deletions(-) create mode 100644 src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java create mode 100644 src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java create mode 100644 src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java create mode 100644 src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java create mode 100644 src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java create mode 100644 src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java create mode 100644 src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index 3b81f6d062..4a4a69b007 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -13,13 +13,13 @@ package io.reactivex.processors; -import io.reactivex.annotations.CheckReturnValue; import java.lang.reflect.Array; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import org.reactivestreams.*; +import io.reactivex.annotations.*; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; @@ -217,6 +217,41 @@ public void onComplete() { } } + /** + * Tries to emit the item to all currently subscribed Subscribers if all of them + * has requested some value, returns false otherwise. + *

+ * This method should be called in a sequential manner just like the onXXX methods + * of the PublishProcessor. + *

+ * Calling with null will terminate the PublishProcessor and a NullPointerException + * is signalled to the Subscribers. + * @param t the item to emit, not null + * @return true if the item was emitted to all Subscribers + * @since 2.0.8 - experimental + */ + @Experimental + public boolean offer(T t) { + if (t == null) { + onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); + return true; + } + BehaviorSubscription[] array = subscribers.get(); + + for (BehaviorSubscription s : array) { + if (s.isFull()) { + return false; + } + } + + Object o = NotificationLite.next(t); + setCurrent(o); + for (BehaviorSubscription bs : array) { + bs.emitNext(o, index); + } + return true; + } + @Override public boolean hasSubscribers() { return subscribers.get().length != 0; @@ -538,5 +573,9 @@ void emitLoop() { q.forEachWhile(this); } } + + public boolean isFull() { + return get() == 0L; + } } } diff --git a/src/main/java/io/reactivex/processors/PublishProcessor.java b/src/main/java/io/reactivex/processors/PublishProcessor.java index 86abf96cab..554632c0e6 100644 --- a/src/main/java/io/reactivex/processors/PublishProcessor.java +++ b/src/main/java/io/reactivex/processors/PublishProcessor.java @@ -12,11 +12,11 @@ */ package io.reactivex.processors; -import io.reactivex.annotations.CheckReturnValue; import java.util.concurrent.atomic.*; import org.reactivestreams.*; +import io.reactivex.annotations.*; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; @@ -227,6 +227,39 @@ public void onComplete() { } } + /** + * Tries to emit the item to all currently subscribed Subscribers if all of them + * has requested some value, returns false otherwise. + *

+ * This method should be called in a sequential manner just like the onXXX methods + * of the PublishProcessor. + *

+ * Calling with null will terminate the PublishProcessor and a NullPointerException + * is signalled to the Subscribers. + * @param t the item to emit, not null + * @return true if the item was emitted to all Subscribers + * @since 2.0.8 - experimental + */ + @Experimental + public boolean offer(T t) { + if (t == null) { + onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); + return true; + } + PublishSubscription[] array = subscribers.get(); + + for (PublishSubscription s : array) { + if (s.isFull()) { + return false; + } + } + + for (PublishSubscription s : array) { + s.onNext(t); + } + return true; + } + @Override public boolean hasSubscribers() { return subscribers.get().length != 0; @@ -321,5 +354,9 @@ public void cancel() { public boolean isCancelled() { return get() == Long.MIN_VALUE; } + + boolean isFull() { + return get() == 0L; + } } } diff --git a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java index f91c8f98b1..04a66e9737 100644 --- a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java +++ b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java @@ -699,4 +699,57 @@ public void firstBackpressured() { assertFalse(p.hasSubscribers()); } + + @Test + public void offer() { + BehaviorProcessor pp = BehaviorProcessor.create(); + + TestSubscriber ts = pp.test(0); + + assertFalse(pp.offer(1)); + + ts.request(1); + + assertTrue(pp.offer(1)); + + assertFalse(pp.offer(2)); + + ts.cancel(); + + assertTrue(pp.offer(2)); + + ts = pp.test(1); + + assertTrue(pp.offer(null)); + + ts.assertFailure(NullPointerException.class, 2); + + assertTrue(pp.hasThrowable()); + assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException); + } + + @Test + public void offerAsync() throws Exception { + final BehaviorProcessor pp = BehaviorProcessor.create(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + while (!pp.hasSubscribers()) { + Thread.yield(); + } + + for (int i = 1; i <= 10; i++) { + while (!pp.offer(i)) { } + } + pp.onComplete(); + } + }); + + Thread.sleep(1); + + pp.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } } diff --git a/src/test/java/io/reactivex/processors/PublishProcessorTest.java b/src/test/java/io/reactivex/processors/PublishProcessorTest.java index 82e7ed7ac8..c29b7efa18 100644 --- a/src/test/java/io/reactivex/processors/PublishProcessorTest.java +++ b/src/test/java/io/reactivex/processors/PublishProcessorTest.java @@ -621,5 +621,56 @@ public void run() { } } + @Test + public void offer() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.test(0); + + assertFalse(pp.offer(1)); + + ts.request(1); + + assertTrue(pp.offer(1)); + + assertFalse(pp.offer(2)); + + ts.cancel(); + + assertTrue(pp.offer(2)); + + ts = pp.test(0); + + assertTrue(pp.offer(null)); + + ts.assertFailure(NullPointerException.class); + assertTrue(pp.hasThrowable()); + assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException); + } + + @Test + public void offerAsync() throws Exception { + final PublishProcessor pp = PublishProcessor.create(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + while (!pp.hasSubscribers()) { + Thread.yield(); + } + + for (int i = 1; i <= 10; i++) { + while (!pp.offer(i)) { } + } + pp.onComplete(); + } + }); + + Thread.sleep(1); + + pp.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } } diff --git a/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java new file mode 100644 index 0000000000..336475a5c9 --- /dev/null +++ b/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.AsyncProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class AsyncProcessorAsPublisherTckTest extends BaseTck { + + public AsyncProcessorAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final AsyncProcessor pp = AsyncProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + Thread.yield(); + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } + + @Override + public long maxElementsFromPublisher() { + return 1; + } +} diff --git a/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java new file mode 100644 index 0000000000..7f4a91a067 --- /dev/null +++ b/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java @@ -0,0 +1,53 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.BehaviorProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class BehaviorProcessorAsPublisherTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + final BehaviorProcessor pp = BehaviorProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + if (System.currentTimeMillis() - start > 200) { + return; + } + Thread.yield(); + } + + for (int i = 0; i < elements; i++) { + while (!pp.offer(i)) { + Thread.yield(); + if (System.currentTimeMillis() - start > 1000) { + return; + } + } + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java new file mode 100644 index 0000000000..dfc1b6bb12 --- /dev/null +++ b/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class PublishProcessorAsPublisherTckTest extends BaseTck { + + public PublishProcessorAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final PublishProcessor pp = PublishProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + Thread.yield(); + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + while (!pp.offer(i)) { + Thread.yield(); + if (System.currentTimeMillis() - start > 1000) { + return; + } + } + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java new file mode 100644 index 0000000000..a78d547ee2 --- /dev/null +++ b/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.ReplayProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class ReplayProcessorSizeBoundAsPublisherTckTest extends BaseTck { + + public ReplayProcessorSizeBoundAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final ReplayProcessor pp = ReplayProcessor.createWithSize((int)elements + 10); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + Thread.yield(); + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java new file mode 100644 index 0000000000..077dea9ab1 --- /dev/null +++ b/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java @@ -0,0 +1,54 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.ReplayProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class ReplayProcessorTimeBoundAsPublisherTckTest extends BaseTck { + + public ReplayProcessorTimeBoundAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final ReplayProcessor pp = ReplayProcessor.createWithTime(1, TimeUnit.MINUTES, Schedulers.computation()); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + Thread.yield(); + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java new file mode 100644 index 0000000000..6e17d619d8 --- /dev/null +++ b/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.ReplayProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class ReplayProcessorUnboundedAsPublisherTckTest extends BaseTck { + + public ReplayProcessorUnboundedAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final ReplayProcessor pp = ReplayProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + Thread.yield(); + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java new file mode 100644 index 0000000000..207736cda8 --- /dev/null +++ b/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.*; +import io.reactivex.schedulers.Schedulers; + +@Test +public class UnicastProcessorAsPublisherTckTest extends BaseTck { + + public UnicastProcessorAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final UnicastProcessor pp = UnicastProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + Thread.yield(); + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } +} From a2aa0f0313380142ef585a40c316a5e352872c41 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 14 Mar 2017 16:17:14 +0100 Subject: [PATCH 2/2] Sleep instead of yield. --- .../io/reactivex/processors/BehaviorProcessorTest.java | 6 +++++- .../java/io/reactivex/processors/PublishProcessorTest.java | 6 +++++- .../io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java | 7 ++++++- .../reactivex/tck/BehaviorProcessorAsPublisherTckTest.java | 7 ++++++- .../reactivex/tck/PublishProcessorAsPublisherTckTest.java | 7 ++++++- .../tck/ReplayProcessorSizeBoundAsPublisherTckTest.java | 7 ++++++- .../tck/ReplayProcessorTimeBoundAsPublisherTckTest.java | 7 ++++++- .../tck/ReplayProcessorUnboundedAsPublisherTckTest.java | 7 ++++++- .../reactivex/tck/UnicastProcessorAsPublisherTckTest.java | 7 ++++++- 9 files changed, 52 insertions(+), 9 deletions(-) diff --git a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java index 04a66e9737..48becde4ee 100644 --- a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java +++ b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java @@ -736,7 +736,11 @@ public void offerAsync() throws Exception { @Override public void run() { while (!pp.hasSubscribers()) { - Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } } for (int i = 1; i <= 10; i++) { diff --git a/src/test/java/io/reactivex/processors/PublishProcessorTest.java b/src/test/java/io/reactivex/processors/PublishProcessorTest.java index c29b7efa18..3078250ac0 100644 --- a/src/test/java/io/reactivex/processors/PublishProcessorTest.java +++ b/src/test/java/io/reactivex/processors/PublishProcessorTest.java @@ -657,7 +657,11 @@ public void offerAsync() throws Exception { @Override public void run() { while (!pp.hasSubscribers()) { - Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } } for (int i = 1; i <= 10; i++) { diff --git a/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java index 336475a5c9..b1e3ba4405 100644 --- a/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java +++ b/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java @@ -35,7 +35,12 @@ public Publisher createPublisher(final long elements) { public void run() { long start = System.currentTimeMillis(); while (!pp.hasSubscribers()) { - Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + if (System.currentTimeMillis() - start > 200) { return; } diff --git a/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java index 7f4a91a067..cf1a47887e 100644 --- a/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java +++ b/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java @@ -31,10 +31,15 @@ public Publisher createPublisher(final long elements) { public void run() { long start = System.currentTimeMillis(); while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + if (System.currentTimeMillis() - start > 200) { return; } - Thread.yield(); } for (int i = 0; i < elements; i++) { diff --git a/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java index dfc1b6bb12..7b82099650 100644 --- a/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java +++ b/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java @@ -35,7 +35,12 @@ public Publisher createPublisher(final long elements) { public void run() { long start = System.currentTimeMillis(); while (!pp.hasSubscribers()) { - Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + if (System.currentTimeMillis() - start > 200) { return; } diff --git a/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java index a78d547ee2..9e33b9b15d 100644 --- a/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java +++ b/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java @@ -35,7 +35,12 @@ public Publisher createPublisher(final long elements) { public void run() { long start = System.currentTimeMillis(); while (!pp.hasSubscribers()) { - Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + if (System.currentTimeMillis() - start > 200) { return; } diff --git a/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java index 077dea9ab1..6edda768e6 100644 --- a/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java +++ b/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java @@ -37,7 +37,12 @@ public Publisher createPublisher(final long elements) { public void run() { long start = System.currentTimeMillis(); while (!pp.hasSubscribers()) { - Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + if (System.currentTimeMillis() - start > 200) { return; } diff --git a/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java index 6e17d619d8..10796599aa 100644 --- a/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java +++ b/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java @@ -35,7 +35,12 @@ public Publisher createPublisher(final long elements) { public void run() { long start = System.currentTimeMillis(); while (!pp.hasSubscribers()) { - Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + if (System.currentTimeMillis() - start > 200) { return; } diff --git a/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java index 207736cda8..d64164936b 100644 --- a/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java +++ b/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java @@ -35,7 +35,12 @@ public Publisher createPublisher(final long elements) { public void run() { long start = System.currentTimeMillis(); while (!pp.hasSubscribers()) { - Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + if (System.currentTimeMillis() - start > 200) { return; }