diff --git a/src/main/java/rx/internal/util/ObjectPool.java b/src/main/java/rx/internal/util/ObjectPool.java index 0aa005208e..35da79335a 100644 --- a/src/main/java/rx/internal/util/ObjectPool.java +++ b/src/main/java/rx/internal/util/ObjectPool.java @@ -21,11 +21,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; -import rx.Scheduler.Worker; -import rx.functions.Action0; -import rx.internal.schedulers.SchedulerLifecycle; +import rx.internal.schedulers.*; import rx.internal.util.unsafe.*; -import rx.schedulers.Schedulers; public abstract class ObjectPool implements SchedulerLifecycle { Queue pool; @@ -33,7 +30,7 @@ public abstract class ObjectPool implements SchedulerLifecycle { final int maxSize; private final long validationInterval; - private final AtomicReference schedulerWorker; + private final AtomicReference> periodicTask; public ObjectPool() { this(0, 0, 67); @@ -55,7 +52,7 @@ private ObjectPool(final int min, final int max, final long validationInterval) this.minSize = min; this.maxSize = max; this.validationInterval = validationInterval; - this.schedulerWorker = new AtomicReference(); + this.periodicTask = new AtomicReference>(); // initialize pool initialize(min); @@ -96,38 +93,51 @@ public void returnObject(T object) { */ @Override public void shutdown() { - Worker w = schedulerWorker.getAndSet(null); - if (w != null) { - w.unsubscribe(); + Future f = periodicTask.getAndSet(null); + if (f != null) { + f.cancel(false); } } @Override public void start() { - Worker w = Schedulers.computation().createWorker(); - if (schedulerWorker.compareAndSet(null, w)) { - w.schedulePeriodically(new Action0() { - - @Override - public void call() { - int size = pool.size(); - if (size < minSize) { - int sizeToBeAdded = maxSize - size; - for (int i = 0; i < sizeToBeAdded; i++) { - pool.add(createObject()); - } - } else if (size > maxSize) { - int sizeToBeRemoved = size - maxSize; - for (int i = 0; i < sizeToBeRemoved; i++) { - // pool.pollLast(); - pool.poll(); + for (;;) { + if (periodicTask.get() != null) { + return; + } + ScheduledExecutorService w = GenericScheduledExecutorService.getInstance(); + + Future f; + try { + f = w.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + int size = pool.size(); + if (size < minSize) { + int sizeToBeAdded = maxSize - size; + for (int i = 0; i < sizeToBeAdded; i++) { + pool.add(createObject()); + } + } else if (size > maxSize) { + int sizeToBeRemoved = size - maxSize; + for (int i = 0; i < sizeToBeRemoved; i++) { + // pool.pollLast(); + pool.poll(); + } } } - } - - }, validationInterval, validationInterval, TimeUnit.SECONDS); - } else { - w.unsubscribe(); + + }, validationInterval, validationInterval, TimeUnit.SECONDS); + } catch (RejectedExecutionException ex) { + RxJavaPluginUtils.handleException(ex); + break; + } + if (!periodicTask.compareAndSet(null, f)) { + f.cancel(false); + } else { + break; + } } }