diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactory.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactory.java index 043569334c..515c2f2061 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactory.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactory.java @@ -31,86 +31,11 @@ private SchedulerPoolFactory() { static final String PURGE_ENABLED_KEY = "rx3.purge-enabled"; - /** - * Indicates the periodic purging of the ScheduledExecutorService is enabled. - */ public static final boolean PURGE_ENABLED; - static final String PURGE_PERIOD_SECONDS_KEY = "rx3.purge-period-seconds"; - - /** - * Indicates the purge period of the ScheduledExecutorServices created by create(). - */ - public static final int PURGE_PERIOD_SECONDS; - - static final AtomicReference PURGE_THREAD = - new AtomicReference<>(); - - // Upcast to the Map interface here to avoid 8.x compatibility issues. - // See http://stackoverflow.com/a/32955708/61158 - static final Map POOLS = - new ConcurrentHashMap<>(); - - /** - * Starts the purge thread if not already started. - */ - public static void start() { - tryStart(PURGE_ENABLED); - } - - static void tryStart(boolean purgeEnabled) { - if (purgeEnabled) { - for (;;) { - ScheduledExecutorService curr = PURGE_THREAD.get(); - if (curr != null) { - return; - } - ScheduledExecutorService next = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSchedulerPurge")); - if (PURGE_THREAD.compareAndSet(curr, next)) { - - next.scheduleAtFixedRate(new ScheduledTask(), PURGE_PERIOD_SECONDS, PURGE_PERIOD_SECONDS, TimeUnit.SECONDS); - - return; - } else { - next.shutdownNow(); - } - } - } - } - - /** - * Stops the purge thread. - */ - public static void shutdown() { - ScheduledExecutorService exec = PURGE_THREAD.getAndSet(null); - if (exec != null) { - exec.shutdownNow(); - } - POOLS.clear(); - } - static { SystemPropertyAccessor propertyAccessor = new SystemPropertyAccessor(); PURGE_ENABLED = getBooleanProperty(true, PURGE_ENABLED_KEY, true, true, propertyAccessor); - PURGE_PERIOD_SECONDS = getIntProperty(PURGE_ENABLED, PURGE_PERIOD_SECONDS_KEY, 1, 1, propertyAccessor); - - start(); - } - - static int getIntProperty(boolean enabled, String key, int defaultNotFound, int defaultNotEnabled, Function propertyAccessor) { - if (enabled) { - try { - String value = propertyAccessor.apply(key); - if (value == null) { - return defaultNotFound; - } - return Integer.parseInt(value); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - return defaultNotFound; - } - } - return defaultNotEnabled; } static boolean getBooleanProperty(boolean enabled, String key, boolean defaultNotFound, boolean defaultNotEnabled, Function propertyAccessor) { @@ -142,28 +67,8 @@ public String apply(String t) { * @return the ScheduledExecutorService */ public static ScheduledExecutorService create(ThreadFactory factory) { - final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); - tryPutIntoPool(PURGE_ENABLED, exec); + final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory); + exec.setRemoveOnCancelPolicy(PURGE_ENABLED); return exec; } - - static void tryPutIntoPool(boolean purgeEnabled, ScheduledExecutorService exec) { - if (purgeEnabled && exec instanceof ScheduledThreadPoolExecutor) { - ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec; - POOLS.put(e, exec); - } - } - - static final class ScheduledTask implements Runnable { - @Override - public void run() { - for (ScheduledThreadPoolExecutor e : new ArrayList<>(POOLS.keySet())) { - if (e.isShutdown()) { - POOLS.remove(e); - } else { - e.purge(); - } - } - } - } } diff --git a/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java b/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java index 0c0ecbdbe5..f5dbe6ce7c 100644 --- a/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java @@ -38,8 +38,7 @@ *
  • {@code rx3.computation-priority} (int): sets the thread priority of the {@link #computation()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}
  • *
  • {@code rx3.newthread-priority} (int): sets the thread priority of the {@link #newThread()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}
  • *
  • {@code rx3.single-priority} (int): sets the thread priority of the {@link #single()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}
  • - *
  • {@code rx3.purge-enabled} (boolean): enables periodic purging of all {@code Scheduler}'s backing thread pools, default is {@code false}
  • - *
  • {@code rx3.purge-period-seconds} (int): specifies the periodic purge interval of all {@code Scheduler}'s backing thread pools, default is 1 second
  • + *
  • {@code rx3.purge-enabled} (boolean): enables purging of all {@code Scheduler}'s backing thread pools, default is {@code true}
  • *
  • {@code rx3.scheduler.use-nanotime} (boolean): {@code true} instructs {@code Scheduler} to use {@link System#nanoTime()} for {@link Scheduler#now(TimeUnit)}, * instead of default {@link System#currentTimeMillis()} ({@code false})
  • * @@ -556,7 +555,6 @@ public static void shutdown() { newThread().shutdown(); single().shutdown(); trampoline().shutdown(); - SchedulerPoolFactory.shutdown(); } /** @@ -569,7 +567,6 @@ public static void start() { newThread().start(); single().start(); trampoline().start(); - SchedulerPoolFactory.start(); } static final class IOTask implements Supplier { diff --git a/src/test/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactoryTest.java b/src/test/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactoryTest.java index dd5667ffbb..5d93dabc09 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactoryTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactoryTest.java @@ -30,50 +30,6 @@ public void utilityClass() { TestHelper.checkUtilityClass(SchedulerPoolFactory.class); } - @Test - public void multiStartStop() { - SchedulerPoolFactory.shutdown(); - - SchedulerPoolFactory.shutdown(); - - SchedulerPoolFactory.tryStart(false); - - assertNull(SchedulerPoolFactory.PURGE_THREAD.get()); - - SchedulerPoolFactory.start(); - - // restart schedulers - Schedulers.shutdown(); - - Schedulers.start(); - } - - @Test - public void startRace() throws InterruptedException { - try { - for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { - SchedulerPoolFactory.shutdown(); - - Runnable r1 = new Runnable() { - @Override - public void run() { - SchedulerPoolFactory.start(); - } - }; - - TestHelper.race(r1, r1); - } - - } finally { - // restart schedulers - Schedulers.shutdown(); - - Thread.sleep(200); - - Schedulers.start(); - } - } - @Test public void boolPropertiesDisabledReturnsDefaultDisabled() throws Throwable { assertTrue(SchedulerPoolFactory.getBooleanProperty(false, "key", false, true, failingPropertiesAccessor)); @@ -98,30 +54,6 @@ public void boolPropertiesReturnsValue() throws Throwable { assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "false", false, true, Functions.identity())); } - @Test - public void intPropertiesDisabledReturnsDefaultDisabled() throws Throwable { - assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 0, -1, failingPropertiesAccessor)); - assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 1, -1, failingPropertiesAccessor)); - } - - @Test - public void intPropertiesEnabledMissingReturnsDefaultMissing() throws Throwable { - assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, missingPropertiesAccessor)); - assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, missingPropertiesAccessor)); - } - - @Test - public void intPropertiesFailureReturnsDefaultMissing() throws Throwable { - assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, failingPropertiesAccessor)); - assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, failingPropertiesAccessor)); - } - - @Test - public void intPropertiesReturnsValue() throws Throwable { - assertEquals(1, SchedulerPoolFactory.getIntProperty(true, "1", 0, 4, Functions.identity())); - assertEquals(2, SchedulerPoolFactory.getIntProperty(true, "2", 3, 5, Functions.identity())); - } - static final Function failingPropertiesAccessor = new Function() { @Override public String apply(String v) throws Throwable { @@ -135,22 +67,4 @@ public String apply(String v) throws Throwable { return null; } }; - - @Test - public void putIntoPoolNoPurge() { - int s = SchedulerPoolFactory.POOLS.size(); - - SchedulerPoolFactory.tryPutIntoPool(false, null); - - assertEquals(s, SchedulerPoolFactory.POOLS.size()); - } - - @Test - public void putIntoPoolNonThreadPool() { - int s = SchedulerPoolFactory.POOLS.size(); - - SchedulerPoolFactory.tryPutIntoPool(true, null); - - assertEquals(s, SchedulerPoolFactory.POOLS.size()); - } } diff --git a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerTest.java b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerTest.java index c4ba94189c..bb3e759884 100644 --- a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerTest.java @@ -93,7 +93,7 @@ public void run() { System.out.println("Wait before second GC"); System.out.println("JDK 6 purge is N log N because it removes and shifts one by one"); - int t = (int)(n * Math.log(n) / 100) + SchedulerPoolFactory.PURGE_PERIOD_SECONDS * 1000; + int t = (int)(n * Math.log(n) / 100) + 1000; int sleepStep = 100; while (t > 0) { System.out.printf(" >> Waiting for purge: %.2f s remaining%n", t / 1000d);