From d4eff0e962e032a3adc9f81c295cb3e6fc0e4581 Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Mon, 13 Jan 2025 16:44:54 +0800 Subject: [PATCH 1/2] fix: Issue/806 * fix: Issue/806 --- .../FlowableBufferTimedFlushable.java | 28 ++++++++- .../FlowableBufferTimedFlushableTest.java | 61 +++++++++++++++++-- 2 files changed, 81 insertions(+), 8 deletions(-) diff --git a/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java b/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java index c487d8980be..b58bca87f36 100644 --- a/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java +++ b/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java @@ -23,6 +23,8 @@ import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import io.reactivex.rxjava3.internal.util.QueueDrainHelper; import io.reactivex.rxjava3.subscribers.SerializedSubscriber; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -103,6 +105,8 @@ static final class BufferExactBoundedSubscriber actual, Supplier bufferSupplier, @@ -171,8 +175,16 @@ public void onNext(T t) { producerIndex++; } - if (restartTimerOnMaxSize) { - timer.dispose(); + // If try lock fails, it indicates that the timer is doing run logic, + // so there is no need to dispose it. + if (timerLock.tryLock()) { + try { + if (restartTimerOnMaxSize) { + timer.dispose(); + } + } finally { + timerLock.unlock(); + } } fastPathOrderedEmitMax(b, false, this); @@ -257,6 +269,18 @@ public boolean isDisposed() { @Override public void run() { + // If try lock fails, it indicates that the timer is doing dispose, + // so there is no need to do the actual running action. + if (timerLock.tryLock()) { + try { + doRun(); + } finally { + timerLock.unlock(); + } + } + } + + public void doRun() { U next; try { diff --git a/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java b/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java index 4d9b248df8a..89ec98ee3f7 100644 --- a/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java +++ b/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java @@ -1,16 +1,19 @@ package com.influxdb.client.internal.flowable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.internal.util.ArrayListSupplier; import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.TestScheduler; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -75,4 +78,50 @@ public void byFlusher() { subscription.dispose(); } + + @Test + public void byTimerNotInterruptedException() throws InterruptedException { + PublishProcessor publisher = PublishProcessor.create(); + AtomicBoolean hasError = new AtomicBoolean(false); + CountDownLatch firstConsume = new CountDownLatch(1); + CountDownLatch consumeTwice = new CountDownLatch(2); + + Consumer> listConsumer = (List a) -> { + + try { + firstConsume.countDown(); + Thread.sleep(500); + } catch (InterruptedException e) { + hasError.set(true); + } finally { + consumeTwice.countDown(); + } + }; + Disposable subscribe = publisher + .compose(source -> + new FlowableBufferTimedFlushable<>( + source, + PublishProcessor.create(), + 1, + TimeUnit.SECONDS, + 4, + Schedulers.newThread(), + ArrayListSupplier.asSupplier() + )).subscribe(listConsumer); + + publisher.offer(1); + publisher.offer(2); + publisher.offer(3); + + firstConsume.await(); + publisher.offer(4); + publisher.offer(5); + publisher.offer(6); + publisher.offer(7); + + consumeTwice.countDown(); + Assertions.assertThat(hasError.get()).isFalse(); + subscribe.dispose(); + } + } From e2bdf03e3aec805daefd6d4433942a928077bcc0 Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Wed, 15 Jan 2025 10:14:50 +0800 Subject: [PATCH 2/2] fix: Issue/806 * docs: update CHANGELOG.md --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 921759e2fd1..eb67698ff91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ ## 7.3.0 [unreleased] +### Bug Fixes + +- [#807](https://github.com/influxdata/influxdb-client-java/pull/807): fix: Issue/806: Add a lock to prevent the disposal of the timer while it is running. + + ### Dependencies ⚠️ Important Notice: Starting from this release, we won’t be listing every dependency change in our changelog. This helps us maintain the project faster and focus on important features for our InfluxDB client.