Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12725,15 +12725,15 @@ public final Flowable<T> onBackpressureLatest() {
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureReduce} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 3.0.9 - experimental
* @param reducer the bi-function to call when there is more than one non-emitted value to downstream,
* the first argument of the bi-function is previous item and the second one is currently
* emitting from upstream
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code reducer} is {@code null}
* @since 3.0.9 - experimental
* @since 3.1.0
* @see #onBackpressureReduce(Supplier, BiFunction)
*/
@Experimental
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down Expand Up @@ -12764,6 +12764,7 @@ public final Flowable<T> onBackpressureReduce(@NonNull BiFunction<T, T, T> reduc
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureReduce} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 3.0.9 - experimental
* @param <R> the aggregate type emitted when the downstream requests more items
* @param supplier the factory to call to create new item of type R to pass it as the first argument to {@code reducer}.
* It is called when previous returned value by {@code reducer} already sent to
Expand All @@ -12774,9 +12775,8 @@ public final Flowable<T> onBackpressureReduce(@NonNull BiFunction<T, T, T> reduc
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code supplier} or {@code reducer} is {@code null}
* @see #onBackpressureReduce(BiFunction)
* @since 3.0.9 - experimental
* @since 3.1.0
*/
@Experimental
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@

package io.reactivex.rxjava3.internal.schedulers;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Function;
Expand All @@ -31,86 +29,11 @@ private SchedulerPoolFactory() {

static final String PURGE_ENABLED_KEY = "rx3.purge-enabled";

/**
* Indicates the periodic purging of the ScheduledExecutorService is enabled.
*/
public static final boolean PURGE_ENABLED;

static final String PURGE_PERIOD_SECONDS_KEY = "rx3.purge-period-seconds";

/**
* Indicates the purge period of the ScheduledExecutorServices created by create().
*/
public static final int PURGE_PERIOD_SECONDS;

static final AtomicReference<ScheduledExecutorService> PURGE_THREAD =
new AtomicReference<>();

// Upcast to the Map interface here to avoid 8.x compatibility issues.
// See http://stackoverflow.com/a/32955708/61158
static final Map<ScheduledThreadPoolExecutor, Object> POOLS =
new ConcurrentHashMap<>();

/**
* Starts the purge thread if not already started.
*/
public static void start() {
tryStart(PURGE_ENABLED);
}

static void tryStart(boolean purgeEnabled) {
if (purgeEnabled) {
for (;;) {
ScheduledExecutorService curr = PURGE_THREAD.get();
if (curr != null) {
return;
}
ScheduledExecutorService next = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSchedulerPurge"));
if (PURGE_THREAD.compareAndSet(curr, next)) {

next.scheduleAtFixedRate(new ScheduledTask(), PURGE_PERIOD_SECONDS, PURGE_PERIOD_SECONDS, TimeUnit.SECONDS);

return;
} else {
next.shutdownNow();
}
}
}
}

/**
* Stops the purge thread.
*/
public static void shutdown() {
ScheduledExecutorService exec = PURGE_THREAD.getAndSet(null);
if (exec != null) {
exec.shutdownNow();
}
POOLS.clear();
}

static {
SystemPropertyAccessor propertyAccessor = new SystemPropertyAccessor();
PURGE_ENABLED = getBooleanProperty(true, PURGE_ENABLED_KEY, true, true, propertyAccessor);
PURGE_PERIOD_SECONDS = getIntProperty(PURGE_ENABLED, PURGE_PERIOD_SECONDS_KEY, 1, 1, propertyAccessor);

start();
}

static int getIntProperty(boolean enabled, String key, int defaultNotFound, int defaultNotEnabled, Function<String, String> propertyAccessor) {
if (enabled) {
try {
String value = propertyAccessor.apply(key);
if (value == null) {
return defaultNotFound;
}
return Integer.parseInt(value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
return defaultNotFound;
}
}
return defaultNotEnabled;
}

static boolean getBooleanProperty(boolean enabled, String key, boolean defaultNotFound, boolean defaultNotEnabled, Function<String, String> propertyAccessor) {
Expand Down Expand Up @@ -142,28 +65,8 @@ public String apply(String t) {
* @return the ScheduledExecutorService
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
return exec;
}

static void tryPutIntoPool(boolean purgeEnabled, ScheduledExecutorService exec) {
if (purgeEnabled && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
}

static final class ScheduledTask implements Runnable {
@Override
public void run() {
for (ScheduledThreadPoolExecutor e : new ArrayList<>(POOLS.keySet())) {
if (e.isShutdown()) {
POOLS.remove(e);
} else {
e.purge();
}
}
}
}
}
8 changes: 4 additions & 4 deletions src/main/java/io/reactivex/rxjava3/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -1153,11 +1153,11 @@ public static void setOnParallelAssembly(@Nullable Function<? super ParallelFlow

/**
* Sets the specific hook function.
* <p>History: 3.0.11 - experimental
* @param handler the hook function to set, null allowed
* @since 3.0.11 - experimental
* @since 3.1.0
*/
@SuppressWarnings("rawtypes")
@Experimental
public static void setOnParallelSubscribe(@Nullable BiFunction<? super ParallelFlowable, ? super Subscriber[], ? extends Subscriber[]> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
Expand All @@ -1167,11 +1167,11 @@ public static void setOnParallelSubscribe(@Nullable BiFunction<? super ParallelF

/**
* Returns the current hook function.
* <p>History: 3.0.11 - experimental
* @return the hook function, may be null
* @since 3.0.11 - experimental
* @since 3.1.0
*/
@SuppressWarnings("rawtypes")
@Experimental
@Nullable
public static BiFunction<? super ParallelFlowable, ? super Subscriber[], ? extends Subscriber[]> getOnParallelSubscribe() {
return onParallelSubscribe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
* <li>{@code rx3.computation-priority} (int): sets the thread priority of the {@link #computation()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.newthread-priority} (int): sets the thread priority of the {@link #newThread()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.single-priority} (int): sets the thread priority of the {@link #single()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.purge-enabled} (boolean): enables periodic purging of all {@code Scheduler}'s backing thread pools, default is {@code false}</li>
* <li>{@code rx3.purge-period-seconds} (int): specifies the periodic purge interval of all {@code Scheduler}'s backing thread pools, default is 1 second</li>
* <li>{@code rx3.purge-enabled} (boolean): enables purging of all {@code Scheduler}'s backing thread pools, default is {@code true}</li>
* <li>{@code rx3.scheduler.use-nanotime} (boolean): {@code true} instructs {@code Scheduler} to use {@link System#nanoTime()} for {@link Scheduler#now(TimeUnit)},
* instead of default {@link System#currentTimeMillis()} ({@code false})</li>
* </ul>
Expand Down Expand Up @@ -556,7 +555,6 @@ public static void shutdown() {
newThread().shutdown();
single().shutdown();
trampoline().shutdown();
SchedulerPoolFactory.shutdown();
}

/**
Expand All @@ -569,7 +567,6 @@ public static void start() {
newThread().start();
single().start();
trampoline().start();
SchedulerPoolFactory.start();
}

static final class IOTask implements Supplier<Scheduler> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ public TestScheduler() {
/**
* Creates a new TestScheduler with the option to use the
* {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks.
* <p>History: 3.0.10 - experimental
* @param useOnScheduleHook if {@code true}, the tasks submitted to this
* TestScheduler is wrapped via the
* {@link RxJavaPlugins#onSchedule(Runnable)} hook
* @since 3.0.10 - experimental
* @since 3.1.0
*/
@Experimental
public TestScheduler(boolean useOnScheduleHook) {
this.useOnScheduleHook = useOnScheduleHook;
}
Expand All @@ -78,17 +78,16 @@ public TestScheduler(long delayTime, TimeUnit unit) {
* Creates a new TestScheduler with the specified initial virtual time
* and with the option to use the
* {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks.
*
* <p>History: 3.0.10 - experimental
* @param delayTime
* the point in time to move the Scheduler's clock to
* @param unit
* the units of time that {@code delayTime} is expressed in
* @param useOnScheduleHook if {@code true}, the tasks submitted to this
* TestScheduler is wrapped via the
* {@link RxJavaPlugins#onSchedule(Runnable)} hook
* @since 3.0.10 - experimental
* @since 3.1.0
*/
@Experimental
public TestScheduler(long delayTime, TimeUnit unit, boolean useOnScheduleHook) {
time = unit.toNanos(delayTime);
this.useOnScheduleHook = useOnScheduleHook;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.reactivex.rxjava3.core.RxJavaTest;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class SchedulerPoolFactoryTest extends RxJavaTest {
Expand All @@ -30,50 +29,6 @@ public void utilityClass() {
TestHelper.checkUtilityClass(SchedulerPoolFactory.class);
}

@Test
public void multiStartStop() {
SchedulerPoolFactory.shutdown();

SchedulerPoolFactory.shutdown();

SchedulerPoolFactory.tryStart(false);

assertNull(SchedulerPoolFactory.PURGE_THREAD.get());

SchedulerPoolFactory.start();

// restart schedulers
Schedulers.shutdown();

Schedulers.start();
}

@Test
public void startRace() throws InterruptedException {
try {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
SchedulerPoolFactory.shutdown();

Runnable r1 = new Runnable() {
@Override
public void run() {
SchedulerPoolFactory.start();
}
};

TestHelper.race(r1, r1);
}

} finally {
// restart schedulers
Schedulers.shutdown();

Thread.sleep(200);

Schedulers.start();
}
}

@Test
public void boolPropertiesDisabledReturnsDefaultDisabled() throws Throwable {
assertTrue(SchedulerPoolFactory.getBooleanProperty(false, "key", false, true, failingPropertiesAccessor));
Expand All @@ -98,30 +53,6 @@ public void boolPropertiesReturnsValue() throws Throwable {
assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "false", false, true, Functions.<String>identity()));
}

@Test
public void intPropertiesDisabledReturnsDefaultDisabled() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 0, -1, failingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 1, -1, failingPropertiesAccessor));
}

@Test
public void intPropertiesEnabledMissingReturnsDefaultMissing() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, missingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, missingPropertiesAccessor));
}

@Test
public void intPropertiesFailureReturnsDefaultMissing() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, failingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, failingPropertiesAccessor));
}

@Test
public void intPropertiesReturnsValue() throws Throwable {
assertEquals(1, SchedulerPoolFactory.getIntProperty(true, "1", 0, 4, Functions.<String>identity()));
assertEquals(2, SchedulerPoolFactory.getIntProperty(true, "2", 3, 5, Functions.<String>identity()));
}

static final Function<String, String> failingPropertiesAccessor = new Function<String, String>() {
@Override
public String apply(String v) throws Throwable {
Expand All @@ -135,22 +66,4 @@ public String apply(String v) throws Throwable {
return null;
}
};

@Test
public void putIntoPoolNoPurge() {
int s = SchedulerPoolFactory.POOLS.size();

SchedulerPoolFactory.tryPutIntoPool(false, null);

assertEquals(s, SchedulerPoolFactory.POOLS.size());
}

@Test
public void putIntoPoolNonThreadPool() {
int s = SchedulerPoolFactory.POOLS.size();

SchedulerPoolFactory.tryPutIntoPool(true, null);

assertEquals(s, SchedulerPoolFactory.POOLS.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void run() {

System.out.println("Wait before second GC");
System.out.println("JDK 6 purge is N log N because it removes and shifts one by one");
int t = (int)(n * Math.log(n) / 100) + SchedulerPoolFactory.PURGE_PERIOD_SECONDS * 1000;
int t = (int)(n * Math.log(n) / 100) + 1000;
int sleepStep = 100;
while (t > 0) {
System.out.printf(" >> Waiting for purge: %.2f s remaining%n", t / 1000d);
Expand Down