diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ed29ed9a63..8c77c225438 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,16 @@ ## 7.3.0 [unreleased] + +### Bug Fixes + +- [#807](https://github.com/influxdata/influxdb-client-java/pull/807): fix: Issue/806: Add a lock to prevent the disposal of the timer while it is running. + + ### Features - [#821](https://github.com/influxdata/influxdb-client-java/pull/821): Prevent duplicate interceptors in OkHttpClient builder + ### Dependencies ⚠️ Important Notice: Starting from this release, we won’t be listing every dependency change in our changelog. This helps us maintain the project faster and focus on important features for our InfluxDB client. diff --git a/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java b/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java index c487d8980be..b58bca87f36 100644 --- a/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java +++ b/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java @@ -23,6 +23,8 @@ import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import io.reactivex.rxjava3.internal.util.QueueDrainHelper; import io.reactivex.rxjava3.subscribers.SerializedSubscriber; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -103,6 +105,8 @@ static final class BufferExactBoundedSubscriber actual, Supplier bufferSupplier, @@ -171,8 +175,16 @@ public void onNext(T t) { producerIndex++; } - if (restartTimerOnMaxSize) { - timer.dispose(); + // If try lock fails, it indicates that the timer is doing run logic, + // so there is no need to dispose it. + if (timerLock.tryLock()) { + try { + if (restartTimerOnMaxSize) { + timer.dispose(); + } + } finally { + timerLock.unlock(); + } } fastPathOrderedEmitMax(b, false, this); @@ -257,6 +269,18 @@ public boolean isDisposed() { @Override public void run() { + // If try lock fails, it indicates that the timer is doing dispose, + // so there is no need to do the actual running action. + if (timerLock.tryLock()) { + try { + doRun(); + } finally { + timerLock.unlock(); + } + } + } + + public void doRun() { U next; try { diff --git a/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java b/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java index 4d9b248df8a..89ec98ee3f7 100644 --- a/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java +++ b/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java @@ -1,16 +1,19 @@ package com.influxdb.client.internal.flowable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.internal.util.ArrayListSupplier; import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.TestScheduler; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -75,4 +78,50 @@ public void byFlusher() { subscription.dispose(); } + + @Test + public void byTimerNotInterruptedException() throws InterruptedException { + PublishProcessor publisher = PublishProcessor.create(); + AtomicBoolean hasError = new AtomicBoolean(false); + CountDownLatch firstConsume = new CountDownLatch(1); + CountDownLatch consumeTwice = new CountDownLatch(2); + + Consumer> listConsumer = (List a) -> { + + try { + firstConsume.countDown(); + Thread.sleep(500); + } catch (InterruptedException e) { + hasError.set(true); + } finally { + consumeTwice.countDown(); + } + }; + Disposable subscribe = publisher + .compose(source -> + new FlowableBufferTimedFlushable<>( + source, + PublishProcessor.create(), + 1, + TimeUnit.SECONDS, + 4, + Schedulers.newThread(), + ArrayListSupplier.asSupplier() + )).subscribe(listConsumer); + + publisher.offer(1); + publisher.offer(2); + publisher.offer(3); + + firstConsume.await(); + publisher.offer(4); + publisher.offer(5); + publisher.offer(6); + publisher.offer(7); + + consumeTwice.countDown(); + Assertions.assertThat(hasError.get()).isFalse(); + subscribe.dispose(); + } + }